Skip to content

Commit 4941cbf

Browse files
authored
Allow specifying read consistency (#4343)
* Closes #4340 - Specify Read Consistency * review changes * merge conflicts; correct imports
1 parent 10ef028 commit 4941cbf

File tree

5 files changed

+146
-111
lines changed

5 files changed

+146
-111
lines changed

datastore/google/cloud/datastore/client.py

+33-46
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,21 @@
1515

1616
import os
1717

18-
from google.cloud.datastore_v1.proto import datastore_pb2 as _datastore_pb2
19-
2018
from google.cloud._helpers import _LocalStack
21-
from google.cloud._helpers import (
22-
_determine_default_project as _base_default_project)
19+
from google.cloud._helpers import (_determine_default_project as
20+
_base_default_project)
2321
from google.cloud.client import ClientWithProject
24-
from google.cloud.environment_vars import DISABLE_GRPC
25-
from google.cloud.environment_vars import GCD_DATASET
26-
from google.cloud.environment_vars import GCD_HOST
27-
28-
from google.cloud.datastore._http import HTTPDatastoreAPI
2922
from google.cloud.datastore import helpers
23+
from google.cloud.datastore._http import HTTPDatastoreAPI
3024
from google.cloud.datastore.batch import Batch
3125
from google.cloud.datastore.entity import Entity
3226
from google.cloud.datastore.key import Key
3327
from google.cloud.datastore.query import Query
3428
from google.cloud.datastore.transaction import Transaction
29+
from google.cloud.environment_vars import DISABLE_GRPC
30+
from google.cloud.environment_vars import GCD_DATASET
31+
from google.cloud.environment_vars import GCD_HOST
32+
3533
try:
3634
from google.cloud.datastore._gax import make_datastore_api
3735
_HAVE_GRPC = True
@@ -131,7 +129,7 @@ def _extended_lookup(datastore_api, project, key_pbs,
131129
results = []
132130

133131
loop_num = 0
134-
read_options = _get_read_options(eventual, transaction_id)
132+
read_options = helpers.get_read_options(eventual, transaction_id)
135133
while loop_num < _MAX_LOOPS: # loop against possible deferred.
136134
loop_num += 1
137135
lookup_response = datastore_api.lookup(
@@ -279,7 +277,8 @@ def current_transaction(self):
279277
if isinstance(transaction, Transaction):
280278
return transaction
281279

282-
def get(self, key, missing=None, deferred=None, transaction=None):
280+
def get(self, key, missing=None, deferred=None,
281+
transaction=None, eventual=False):
283282
"""Retrieve an entity from a single key (if it exists).
284283
285284
.. note::
@@ -305,15 +304,26 @@ def get(self, key, missing=None, deferred=None, transaction=None):
305304
:param transaction: (Optional) Transaction to use for read consistency.
306305
If not passed, uses current transaction, if set.
307306
307+
:type eventual: bool
308+
:param eventual: (Optional) Defaults to strongly consistent (False).
309+
Setting True will use eventual consistency, but cannot
310+
be used inside a transaction or will raise ValueError.
311+
308312
:rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType``
309313
:returns: The requested entity if it exists.
314+
315+
:raises: :class:`ValueError` if eventual is True and in a transaction.
310316
"""
311-
entities = self.get_multi(keys=[key], missing=missing,
312-
deferred=deferred, transaction=transaction)
317+
entities = self.get_multi(keys=[key],
318+
missing=missing,
319+
deferred=deferred,
320+
transaction=transaction,
321+
eventual=eventual)
313322
if entities:
314323
return entities[0]
315324

316-
def get_multi(self, keys, missing=None, deferred=None, transaction=None):
325+
def get_multi(self, keys, missing=None, deferred=None,
326+
transaction=None, eventual=False):
317327
"""Retrieve entities, along with their attributes.
318328
319329
:type keys: list of :class:`google.cloud.datastore.key.Key`
@@ -334,10 +344,17 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
334344
:param transaction: (Optional) Transaction to use for read consistency.
335345
If not passed, uses current transaction, if set.
336346
347+
:type eventual: bool
348+
:param eventual: (Optional) Defaults to strongly consistent (False).
349+
Setting True will use eventual consistency,
350+
but cannot be used inside a transaction or
351+
will raise ValueError.
352+
337353
:rtype: list of :class:`google.cloud.datastore.entity.Entity`
338354
:returns: The requested entities.
339355
:raises: :class:`ValueError` if one or more of ``keys`` has a project
340356
which does not match our project.
357+
:raises: :class:`ValueError` if eventual is True and in a transaction.
341358
"""
342359
if not keys:
343360
return []
@@ -353,7 +370,8 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
353370
entity_pbs = _extended_lookup(
354371
datastore_api=self._datastore_api,
355372
project=self.project,
356-
key_pbs=[k.to_protobuf() for k in keys],
373+
key_pbs=[key.to_protobuf() for key in keys],
374+
eventual=eventual,
357375
missing=missing,
358376
deferred=deferred,
359377
transaction_id=transaction and transaction.id,
@@ -581,34 +599,3 @@ def do_something(entity):
581599
if 'namespace' not in kwargs:
582600
kwargs['namespace'] = self.namespace
583601
return Query(self, **kwargs)
584-
585-
586-
def _get_read_options(eventual, transaction_id):
587-
"""Validate rules for read options, and assign to the request.
588-
589-
Helper method for ``lookup()`` and ``run_query``.
590-
591-
:type eventual: bool
592-
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
593-
consistency should be used.
594-
595-
:type transaction_id: bytes
596-
:param transaction_id: A transaction identifier (may be null).
597-
598-
:rtype: :class:`.datastore_pb2.ReadOptions`
599-
:returns: The read options corresponding to the inputs.
600-
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
601-
``transaction_id`` is not ``None``.
602-
"""
603-
if transaction_id is None:
604-
if eventual:
605-
return _datastore_pb2.ReadOptions(
606-
read_consistency=_datastore_pb2.ReadOptions.EVENTUAL)
607-
else:
608-
return _datastore_pb2.ReadOptions()
609-
else:
610-
if eventual:
611-
raise ValueError('eventual must be False when in a transaction')
612-
else:
613-
return _datastore_pb2.ReadOptions(
614-
transaction=transaction_id)

datastore/google/cloud/datastore/helpers.py

+37-5
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@
1919

2020
import datetime
2121
import itertools
22-
23-
from google.protobuf import struct_pb2
24-
from google.type import latlng_pb2
2522
import six
2623

2724
from google.cloud._helpers import _datetime_to_pb_timestamp
2825
from google.cloud._helpers import _pb_timestamp_to_datetime
29-
from google.cloud.datastore_v1.proto import entity_pb2 as _entity_pb2
26+
from google.cloud.datastore_v1.proto import datastore_pb2
27+
from google.cloud.datastore_v1.proto import entity_pb2
3028
from google.cloud.datastore.entity import Entity
3129
from google.cloud.datastore.key import Key
3230

31+
from google.protobuf import struct_pb2
32+
from google.type import latlng_pb2
33+
3334

3435
def _get_meaning(value_pb, is_list=False):
3536
"""Get the meaning from a protobuf value.
@@ -204,7 +205,7 @@ def entity_to_protobuf(entity):
204205
:rtype: :class:`.entity_pb2.Entity`
205206
:returns: The protobuf representing the entity.
206207
"""
207-
entity_pb = _entity_pb2.Entity()
208+
entity_pb = entity_pb2.Entity()
208209
if entity.key is not None:
209210
key_pb = entity.key.to_protobuf()
210211
entity_pb.key.CopyFrom(key_pb)
@@ -233,6 +234,37 @@ def entity_to_protobuf(entity):
233234
return entity_pb
234235

235236

237+
def get_read_options(eventual, transaction_id):
238+
"""Validate rules for read options, and assign to the request.
239+
240+
Helper method for ``lookup()`` and ``run_query``.
241+
242+
:type eventual: bool
243+
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
244+
consistency should be used.
245+
246+
:type transaction_id: bytes
247+
:param transaction_id: A transaction identifier (may be null).
248+
249+
:rtype: :class:`.datastore_pb2.ReadOptions`
250+
:returns: The read options corresponding to the inputs.
251+
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
252+
``transaction_id`` is not ``None``.
253+
"""
254+
if transaction_id is None:
255+
if eventual:
256+
return datastore_pb2.ReadOptions(
257+
read_consistency=datastore_pb2.ReadOptions.EVENTUAL)
258+
else:
259+
return datastore_pb2.ReadOptions()
260+
else:
261+
if eventual:
262+
raise ValueError('eventual must be False when in a transaction')
263+
else:
264+
return datastore_pb2.ReadOptions(
265+
transaction=transaction_id)
266+
267+
236268
def key_from_protobuf(pb):
237269
"""Factory method for creating a key based on a protobuf.
238270

datastore/google/cloud/datastore/query.py

+41-25
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,18 @@
1919
from google.api_core import page_iterator
2020
from google.cloud._helpers import _ensure_tuple_or_list
2121

22-
from google.cloud.datastore_v1.proto import datastore_pb2 as _datastore_pb2
23-
from google.cloud.datastore_v1.proto import entity_pb2 as _entity_pb2
24-
from google.cloud.datastore_v1.proto import query_pb2 as _query_pb2
22+
from google.cloud.datastore_v1.proto import entity_pb2
23+
from google.cloud.datastore_v1.proto import query_pb2
2524
from google.cloud.datastore import helpers
2625
from google.cloud.datastore.key import Key
2726

2827

29-
_NOT_FINISHED = _query_pb2.QueryResultBatch.NOT_FINISHED
28+
_NOT_FINISHED = query_pb2.QueryResultBatch.NOT_FINISHED
3029

3130
_FINISHED = (
32-
_query_pb2.QueryResultBatch.NO_MORE_RESULTS,
33-
_query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT,
34-
_query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_CURSOR,
31+
query_pb2.QueryResultBatch.NO_MORE_RESULTS,
32+
query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT,
33+
query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_CURSOR,
3534
)
3635

3736

@@ -81,11 +80,11 @@ class Query(object):
8180
"""
8281

8382
OPERATORS = {
84-
'<=': _query_pb2.PropertyFilter.LESS_THAN_OR_EQUAL,
85-
'>=': _query_pb2.PropertyFilter.GREATER_THAN_OR_EQUAL,
86-
'<': _query_pb2.PropertyFilter.LESS_THAN,
87-
'>': _query_pb2.PropertyFilter.GREATER_THAN,
88-
'=': _query_pb2.PropertyFilter.EQUAL,
83+
'<=': query_pb2.PropertyFilter.LESS_THAN_OR_EQUAL,
84+
'>=': query_pb2.PropertyFilter.GREATER_THAN_OR_EQUAL,
85+
'<': query_pb2.PropertyFilter.LESS_THAN,
86+
'>': query_pb2.PropertyFilter.GREATER_THAN,
87+
'=': query_pb2.PropertyFilter.EQUAL,
8988
}
9089
"""Mapping of operator strings and their protobuf equivalents."""
9190

@@ -331,7 +330,7 @@ def distinct_on(self, value):
331330
self._distinct_on[:] = value
332331

333332
def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
334-
client=None):
333+
client=None, eventual=False):
335334
"""Execute the Query; return an iterator for the matching entities.
336335
337336
For example::
@@ -358,18 +357,28 @@ def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
358357
:param end_cursor: (Optional) cursor passed through to the iterator.
359358
360359
:type client: :class:`google.cloud.datastore.client.Client`
361-
:param client: client used to connect to datastore.
360+
:param client: (Optional) client used to connect to datastore.
362361
If not supplied, uses the query's value.
363362
363+
:type eventual: bool
364+
:param eventual: (Optional) Defaults to strongly consistent (False).
365+
Setting True will use eventual consistency,
366+
but cannot be used inside a transaction or
367+
will raise ValueError.
368+
364369
:rtype: :class:`Iterator`
365370
:returns: The iterator for the query.
366371
"""
367372
if client is None:
368373
client = self._client
369374

370-
return Iterator(
371-
self, client, limit=limit, offset=offset,
372-
start_cursor=start_cursor, end_cursor=end_cursor)
375+
return Iterator(self,
376+
client,
377+
limit=limit,
378+
offset=offset,
379+
start_cursor=start_cursor,
380+
end_cursor=end_cursor,
381+
eventual=eventual)
373382

374383

375384
class Iterator(page_iterator.Iterator):
@@ -396,18 +405,25 @@ class Iterator(page_iterator.Iterator):
396405
:type end_cursor: bytes
397406
:param end_cursor: (Optional) Cursor to end paging through
398407
query results.
408+
409+
:type eventual: bool
410+
:param eventual: (Optional) Defaults to strongly consistent (False).
411+
Setting True will use eventual consistency,
412+
but cannot be used inside a transaction or
413+
will raise ValueError.
399414
"""
400415

401416
next_page_token = None
402417

403418
def __init__(self, query, client, limit=None, offset=None,
404-
start_cursor=None, end_cursor=None):
419+
start_cursor=None, end_cursor=None, eventual=False):
405420
super(Iterator, self).__init__(
406421
client=client, item_to_value=_item_to_entity,
407422
page_token=start_cursor, max_results=limit)
408423
self._query = query
409424
self._offset = offset
410425
self._end_cursor = end_cursor
426+
self._eventual = eventual
411427
# The attributes below will change over the life of the iterator.
412428
self._more_results = True
413429
self._skipped_results = 0
@@ -483,12 +499,12 @@ def _next_page(self):
483499
query_pb = self._build_protobuf()
484500
transaction = self.client.current_transaction
485501
if transaction is None:
486-
read_options = _datastore_pb2.ReadOptions()
502+
transaction_id = None
487503
else:
488-
read_options = _datastore_pb2.ReadOptions(
489-
transaction=transaction.id)
504+
transaction_id = transaction.id
505+
read_options = helpers.get_read_options(self._eventual, transaction_id)
490506

491-
partition_id = _entity_pb2.PartitionId(
507+
partition_id = entity_pb2.PartitionId(
492508
project_id=self._query.project,
493509
namespace_id=self._query.namespace)
494510
response_pb = self.client._datastore_api.run_query(
@@ -512,7 +528,7 @@ def _pb_from_query(query):
512528
it does not contain "in-flight" fields for ongoing query
513529
executions (cursors, offset, limit).
514530
"""
515-
pb = _query_pb2.Query()
531+
pb = query_pb2.Query()
516532

517533
for projection_name in query.projection:
518534
pb.projection.add().property.name = projection_name
@@ -521,15 +537,15 @@ def _pb_from_query(query):
521537
pb.kind.add().name = query.kind
522538

523539
composite_filter = pb.filter.composite_filter
524-
composite_filter.op = _query_pb2.CompositeFilter.AND
540+
composite_filter.op = query_pb2.CompositeFilter.AND
525541

526542
if query.ancestor:
527543
ancestor_pb = query.ancestor.to_protobuf()
528544

529545
# Filter on __key__ HAS_ANCESTOR == ancestor.
530546
ancestor_filter = composite_filter.filters.add().property_filter
531547
ancestor_filter.property.name = '__key__'
532-
ancestor_filter.op = _query_pb2.PropertyFilter.HAS_ANCESTOR
548+
ancestor_filter.op = query_pb2.PropertyFilter.HAS_ANCESTOR
533549
ancestor_filter.value.key_value.CopyFrom(ancestor_pb)
534550

535551
for property_name, operator, value in query.filters:

0 commit comments

Comments
 (0)