Skip to content

Commit a79426d

Browse files
committed
Closes #4340 - Specify Read Consistency
1 parent 8806a6a commit a79426d

File tree

4 files changed

+98
-63
lines changed

4 files changed

+98
-63
lines changed

datastore/google/cloud/datastore/client.py

+37-46
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,24 @@
1515

1616
import os
1717

18-
from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2
19-
2018
from google.cloud._helpers import _LocalStack
21-
from google.cloud._helpers import (
22-
_determine_default_project as _base_default_project)
19+
from google.cloud._helpers import (_determine_default_project as
20+
_default_project)
21+
2322
from google.cloud.client import ClientWithProject
24-
from google.cloud.environment_vars import DISABLE_GRPC
25-
from google.cloud.environment_vars import GCD_DATASET
26-
from google.cloud.environment_vars import GCD_HOST
2723

28-
from google.cloud.datastore._http import HTTPDatastoreAPI
2924
from google.cloud.datastore import helpers
25+
from google.cloud.datastore._http import HTTPDatastoreAPI
3026
from google.cloud.datastore.batch import Batch
3127
from google.cloud.datastore.entity import Entity
3228
from google.cloud.datastore.key import Key
3329
from google.cloud.datastore.query import Query
3430
from google.cloud.datastore.transaction import Transaction
31+
32+
from google.cloud.environment_vars import DISABLE_GRPC
33+
from google.cloud.environment_vars import GCD_DATASET
34+
from google.cloud.environment_vars import GCD_HOST
35+
3536
try:
3637
from google.cloud.datastore._gax import make_datastore_api
3738
_HAVE_GRPC = True
@@ -74,7 +75,7 @@ def _determine_default_project(project=None):
7475
project = _get_gcd_project()
7576

7677
if project is None:
77-
project = _base_default_project(project=project)
78+
project = _default_project(project=project)
7879

7980
return project
8081

@@ -131,7 +132,7 @@ def _extended_lookup(datastore_api, project, key_pbs,
131132
results = []
132133

133134
loop_num = 0
134-
read_options = _get_read_options(eventual, transaction_id)
135+
read_options = helpers.get_read_options(eventual, transaction_id)
135136
while loop_num < _MAX_LOOPS: # loop against possible deferred.
136137
loop_num += 1
137138
lookup_response = datastore_api.lookup(
@@ -276,7 +277,8 @@ def current_transaction(self):
276277
if isinstance(transaction, Transaction):
277278
return transaction
278279

279-
def get(self, key, missing=None, deferred=None, transaction=None):
280+
def get(self, key, missing=None, deferred=None,
281+
transaction=None, eventual=False):
280282
"""Retrieve an entity from a single key (if it exists).
281283
282284
.. note::
@@ -302,15 +304,27 @@ def get(self, key, missing=None, deferred=None, transaction=None):
302304
:param transaction: (Optional) Transaction to use for read consistency.
303305
If not passed, uses current transaction, if set.
304306
307+
:type eventual: bool
308+
:param eventual: (Optional) Defaults to strongly consistent (False).
309+
Setting True will use eventual consistency,
310+
but cannot be used inside a transaction or
311+
will raise ValueError.
312+
305313
:rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType``
306314
:returns: The requested entity if it exists.
315+
316+
:raises: :class:`ValueError` if eventual is True and in a transaction.
307317
"""
308-
entities = self.get_multi(keys=[key], missing=missing,
309-
deferred=deferred, transaction=transaction)
318+
entities = self.get_multi(keys=[key],
319+
missing=missing,
320+
deferred=deferred,
321+
transaction=transaction,
322+
eventual=eventual)
310323
if entities:
311324
return entities[0]
312325

313-
def get_multi(self, keys, missing=None, deferred=None, transaction=None):
326+
def get_multi(self, keys, missing=None, deferred=None,
327+
transaction=None, eventual=False):
314328
"""Retrieve entities, along with their attributes.
315329
316330
:type keys: list of :class:`google.cloud.datastore.key.Key`
@@ -331,10 +345,17 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
331345
:param transaction: (Optional) Transaction to use for read consistency.
332346
If not passed, uses current transaction, if set.
333347
348+
:type eventual: bool
349+
:param eventual: (Optional) Defaults to strongly consistent (False).
350+
Setting True will use eventual consistency,
351+
but cannot be used inside a transaction or
352+
will raise ValueError.
353+
334354
:rtype: list of :class:`google.cloud.datastore.entity.Entity`
335355
:returns: The requested entities.
336356
:raises: :class:`ValueError` if one or more of ``keys`` has a project
337357
which does not match our project.
358+
:raises: :class:`ValueError` if eventual is True and in a transaction.
338359
"""
339360
if not keys:
340361
return []
@@ -350,7 +371,8 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
350371
entity_pbs = _extended_lookup(
351372
datastore_api=self._datastore_api,
352373
project=self.project,
353-
key_pbs=[k.to_protobuf() for k in keys],
374+
key_pbs=[key.to_protobuf() for key in keys],
375+
eventual=eventual,
354376
missing=missing,
355377
deferred=deferred,
356378
transaction_id=transaction and transaction.id,
@@ -578,34 +600,3 @@ def do_something(entity):
578600
if 'namespace' not in kwargs:
579601
kwargs['namespace'] = self.namespace
580602
return Query(self, **kwargs)
581-
582-
583-
def _get_read_options(eventual, transaction_id):
584-
"""Validate rules for read options, and assign to the request.
585-
586-
Helper method for ``lookup()`` and ``run_query``.
587-
588-
:type eventual: bool
589-
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
590-
consistency should be used.
591-
592-
:type transaction_id: bytes
593-
:param transaction_id: A transaction identifier (may be null).
594-
595-
:rtype: :class:`.datastore_pb2.ReadOptions`
596-
:returns: The read options corresponding to the inputs.
597-
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
598-
``transaction_id`` is not ``None``.
599-
"""
600-
if transaction_id is None:
601-
if eventual:
602-
return _datastore_pb2.ReadOptions(
603-
read_consistency=_datastore_pb2.ReadOptions.EVENTUAL)
604-
else:
605-
return _datastore_pb2.ReadOptions()
606-
else:
607-
if eventual:
608-
raise ValueError('eventual must be False when in a transaction')
609-
else:
610-
return _datastore_pb2.ReadOptions(
611-
transaction=transaction_id)

datastore/google/cloud/datastore/helpers.py

+36-4
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@
1919

2020
import datetime
2121
import itertools
22-
23-
from google.protobuf import struct_pb2
24-
from google.type import latlng_pb2
2522
import six
2623

2724
from google.cloud._helpers import _datetime_to_pb_timestamp
2825
from google.cloud._helpers import _pb_timestamp_to_datetime
29-
from google.cloud.proto.datastore.v1 import entity_pb2 as _entity_pb2
3026
from google.cloud.datastore.entity import Entity
3127
from google.cloud.datastore.key import Key
28+
from google.cloud.proto.datastore.v1 import entity_pb2 as _entity_pb2
29+
from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2
30+
31+
from google.protobuf import struct_pb2
32+
from google.type import latlng_pb2
3233

3334

3435
def _get_meaning(value_pb, is_list=False):
@@ -233,6 +234,37 @@ def entity_to_protobuf(entity):
233234
return entity_pb
234235

235236

237+
def get_read_options(eventual, transaction_id):
238+
"""Validate rules for read options, and assign to the request.
239+
240+
Helper method for ``lookup()`` and ``run_query``.
241+
242+
:type eventual: bool
243+
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
244+
consistency should be used.
245+
246+
:type transaction_id: bytes
247+
:param transaction_id: A transaction identifier (may be null).
248+
249+
:rtype: :class:`.datastore_pb2.ReadOptions`
250+
:returns: The read options corresponding to the inputs.
251+
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
252+
``transaction_id`` is not ``None``.
253+
"""
254+
if transaction_id is None:
255+
if eventual:
256+
return _datastore_pb2.ReadOptions(
257+
read_consistency=_datastore_pb2.ReadOptions.EVENTUAL)
258+
else:
259+
return _datastore_pb2.ReadOptions()
260+
else:
261+
if eventual:
262+
raise ValueError('eventual must be False when in a transaction')
263+
else:
264+
return _datastore_pb2.ReadOptions(
265+
transaction=transaction_id)
266+
267+
236268
def key_from_protobuf(pb):
237269
"""Factory method for creating a key based on a protobuf.
238270

datastore/google/cloud/datastore/query.py

+21-9
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from google.api_core import page_iterator
2020
from google.cloud._helpers import _ensure_tuple_or_list
2121

22-
from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2
2322
from google.cloud.proto.datastore.v1 import entity_pb2 as _entity_pb2
2423
from google.cloud.proto.datastore.v1 import query_pb2 as _query_pb2
2524
from google.cloud.datastore import helpers
@@ -331,7 +330,7 @@ def distinct_on(self, value):
331330
self._distinct_on[:] = value
332331

333332
def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
334-
client=None):
333+
client=None, eventual=False):
335334
"""Execute the Query; return an iterator for the matching entities.
336335
337336
For example::
@@ -358,18 +357,24 @@ def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
358357
:param end_cursor: (Optional) cursor passed through to the iterator.
359358
360359
:type client: :class:`google.cloud.datastore.client.Client`
361-
:param client: client used to connect to datastore.
360+
:param client: (Optional) client used to connect to datastore.
362361
If not supplied, uses the query's value.
363362
363+
:type eventual: bool
364+
:param eventual: (Optional) Defaults to strongly consistent (False).
365+
Setting True will use eventual consistency,
366+
but cannot be used inside a transaction or
367+
will raise ValueError.
368+
364369
:rtype: :class:`Iterator`
365370
:returns: The iterator for the query.
366371
"""
367372
if client is None:
368373
client = self._client
369374

370375
return Iterator(
371-
self, client, limit=limit, offset=offset,
372-
start_cursor=start_cursor, end_cursor=end_cursor)
376+
self, client, limit=limit, offset=offset, start_cursor=start_cursor,
377+
end_cursor=end_cursor, eventual=eventual)
373378

374379

375380
class Iterator(page_iterator.Iterator):
@@ -396,18 +401,25 @@ class Iterator(page_iterator.Iterator):
396401
:type end_cursor: bytes
397402
:param end_cursor: (Optional) Cursor to end paging through
398403
query results.
404+
405+
:type eventual: bool
406+
:param eventual: (Optional) Defaults to strongly consistent (False).
407+
Setting True will use eventual consistency,
408+
but cannot be used inside a transaction or
409+
will raise ValueError.
399410
"""
400411

401412
next_page_token = None
402413

403414
def __init__(self, query, client, limit=None, offset=None,
404-
start_cursor=None, end_cursor=None):
415+
start_cursor=None, end_cursor=None, eventual=False):
405416
super(Iterator, self).__init__(
406417
client=client, item_to_value=_item_to_entity,
407418
page_token=start_cursor, max_results=limit)
408419
self._query = query
409420
self._offset = offset
410421
self._end_cursor = end_cursor
422+
self._eventual = eventual
411423
# The attributes below will change over the life of the iterator.
412424
self._more_results = True
413425
self._skipped_results = 0
@@ -483,10 +495,10 @@ def _next_page(self):
483495
query_pb = self._build_protobuf()
484496
transaction = self.client.current_transaction
485497
if transaction is None:
486-
read_options = _datastore_pb2.ReadOptions()
498+
transaction_id = None
487499
else:
488-
read_options = _datastore_pb2.ReadOptions(
489-
transaction=transaction.id)
500+
transaction_id = transaction.id
501+
read_options = helpers.get_read_options(self._eventual, transaction_id)
490502

491503
partition_id = _entity_pb2.PartitionId(
492504
project_id=self._query.project,

datastore/tests/unit/test_client.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def fallback_mock(project=None):
8585
patch = mock.patch.multiple(
8686
'google.cloud.datastore.client',
8787
_get_gcd_project=gcd_mock,
88-
_base_default_project=fallback_mock)
88+
_default_project=fallback_mock)
8989
with patch:
9090
returned_project = self._call_fut(project_called)
9191

@@ -138,7 +138,7 @@ def test_constructor_w_project_no_environ(self):
138138
# Some environments (e.g. AppVeyor CI) run in GCE, so
139139
# this test would fail artificially.
140140
patch = mock.patch(
141-
'google.cloud.datastore.client._base_default_project',
141+
'google.cloud.datastore.client._default_project',
142142
return_value=None)
143143
with patch:
144144
self.assertRaises(EnvironmentError, self._make_one, None)
@@ -1013,9 +1013,9 @@ def test_query_w_namespace_collision(self):
10131013
class Test__get_read_options(unittest.TestCase):
10141014

10151015
def _call_fut(self, eventual, transaction_id):
1016-
from google.cloud.datastore.client import _get_read_options
1016+
from google.cloud.datastore.helpers import get_read_options
10171017

1018-
return _get_read_options(eventual, transaction_id)
1018+
return get_read_options(eventual, transaction_id)
10191019

10201020
def test_eventual_w_transaction(self):
10211021
with self.assertRaises(ValueError):

0 commit comments

Comments
 (0)