Skip to content

Commit 193a1dd

Browse files
larkeeskuruppu
andauthored
feat(spanner): add resource based routing implementation (#10183)
* feat(spanner): implement resource routing * corrected warning message as per the PR comment * Update spanner/google/cloud/spanner_v1/database.py Add comma to warning message Co-Authored-By: skuruppu <[email protected]> Co-authored-by: skuruppu <[email protected]>
1 parent e710547 commit 193a1dd

File tree

5 files changed

+401
-7
lines changed

5 files changed

+401
-7
lines changed

spanner/google/cloud/spanner_v1/client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import warnings
2727

2828
from google.api_core.gapic_v1 import client_info
29+
import google.api_core.client_options
2930

3031
# pylint: disable=line-too-long
3132
from google.cloud.spanner_admin_database_v1.gapic.database_admin_client import ( # noqa
@@ -122,6 +123,7 @@ class Client(ClientWithProject):
122123

123124
_instance_admin_api = None
124125
_database_admin_api = None
126+
_endpoint_cache = {}
125127
user_agent = None
126128
_SET_PROJECT = True # Used by from_service_account_json()
127129

@@ -143,7 +145,12 @@ def __init__(
143145
project=project, credentials=credentials, _http=None
144146
)
145147
self._client_info = client_info
146-
self._client_options = client_options
148+
if client_options and type(client_options) == dict:
149+
self._client_options = google.api_core.client_options.from_dict(
150+
client_options
151+
)
152+
else:
153+
self._client_options = client_options
147154

148155
if user_agent is not None:
149156
warnings.warn(_USER_AGENT_DEPRECATED, DeprecationWarning, stacklevel=2)

spanner/google/cloud/spanner_v1/database.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616

1717
import copy
1818
import functools
19+
import os
1920
import re
2021
import threading
22+
import warnings
2123

24+
from google.api_core.client_options import ClientOptions
2225
import google.auth.credentials
2326
from google.protobuf.struct_pb2 import Struct
2427
from google.cloud.exceptions import NotFound
28+
from google.api_core.exceptions import PermissionDenied
2529
import six
2630

2731
# pylint: disable=ungrouped-imports
@@ -54,6 +58,19 @@
5458
)
5559

5660

61+
_RESOURCE_ROUTING_PERMISSIONS_WARNING = (
62+
"The client library attempted to connect to an endpoint closer to your Cloud Spanner data "
63+
"but was unable to do so. The client library will fall back and route requests to the endpoint "
64+
"given in the client options, which may result in increased latency. "
65+
"We recommend including the scope https://www.googleapis.com/auth/spanner.admin so that the "
66+
"client library can get an instance-specific endpoint and efficiently route requests."
67+
)
68+
69+
70+
class ResourceRoutingPermissionsWarning(Warning):
71+
pass
72+
73+
5774
class Database(object):
5875
"""Representation of a Cloud Spanner Database.
5976
@@ -178,6 +195,36 @@ def spanner_api(self):
178195
credentials = credentials.with_scopes((SPANNER_DATA_SCOPE,))
179196
client_info = self._instance._client._client_info
180197
client_options = self._instance._client._client_options
198+
if (
199+
os.getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING")
200+
== "true"
201+
):
202+
endpoint_cache = self._instance._client._endpoint_cache
203+
if self._instance.name in endpoint_cache:
204+
client_options = ClientOptions(
205+
api_endpoint=endpoint_cache[self._instance.name]
206+
)
207+
else:
208+
try:
209+
api = self._instance._client.instance_admin_api
210+
resp = api.get_instance(
211+
self._instance.name,
212+
field_mask={"paths": ["endpoint_uris"]},
213+
metadata=_metadata_with_prefix(self.name),
214+
)
215+
endpoints = resp.endpoint_uris
216+
if endpoints:
217+
endpoint_cache[self._instance.name] = list(endpoints)[0]
218+
client_options = ClientOptions(
219+
api_endpoint=endpoint_cache[self._instance.name]
220+
)
221+
# If there are no endpoints, use default endpoint.
222+
except PermissionDenied:
223+
warnings.warn(
224+
_RESOURCE_ROUTING_PERMISSIONS_WARNING,
225+
ResourceRoutingPermissionsWarning,
226+
stacklevel=2,
227+
)
181228
self._spanner_api = SpannerClient(
182229
credentials=credentials,
183230
client_info=client_info,

spanner/tests/system/test_system.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656

5757

5858
CREATE_INSTANCE = os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None
59+
USE_RESOURCE_ROUTING = (
60+
os.getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING") == "true"
61+
)
5962

6063
if CREATE_INSTANCE:
6164
INSTANCE_ID = "google-cloud" + unique_resource_id("-")
@@ -282,6 +285,61 @@ def tearDown(self):
282285
for doomed in self.to_delete:
283286
doomed.drop()
284287

288+
@unittest.skipUnless(USE_RESOURCE_ROUTING, "requires enabling resource routing")
289+
def test_spanner_api_use_user_specified_endpoint(self):
290+
# Clear cache.
291+
Client._endpoint_cache = {}
292+
api = Config.CLIENT.instance_admin_api
293+
resp = api.get_instance(
294+
Config.INSTANCE.name, field_mask={"paths": ["endpoint_uris"]}
295+
)
296+
if not resp or not resp.endpoint_uris:
297+
return # no resolved endpoint.
298+
resolved_endpoint = resp.endpoint_uris[0]
299+
300+
client = Client(client_options={"api_endpoint": resolved_endpoint})
301+
302+
instance = client.instance(Config.INSTANCE.instance_id)
303+
temp_db_id = "temp_db" + unique_resource_id("_")
304+
temp_db = instance.database(temp_db_id)
305+
temp_db.spanner_api
306+
307+
# No endpoint cache - Default endpoint used.
308+
self.assertEqual(client._endpoint_cache, {})
309+
310+
@unittest.skipUnless(USE_RESOURCE_ROUTING, "requires enabling resource routing")
311+
def test_spanner_api_use_resolved_endpoint(self):
312+
# Clear cache.
313+
Client._endpoint_cache = {}
314+
api = Config.CLIENT.instance_admin_api
315+
resp = api.get_instance(
316+
Config.INSTANCE.name, field_mask={"paths": ["endpoint_uris"]}
317+
)
318+
if not resp or not resp.endpoint_uris:
319+
return # no resolved endpoint.
320+
resolved_endpoint = resp.endpoint_uris[0]
321+
322+
client = Client(
323+
client_options=Config.CLIENT._client_options
324+
) # Use same endpoint as main client.
325+
326+
instance = client.instance(Config.INSTANCE.instance_id)
327+
temp_db_id = "temp_db" + unique_resource_id("_")
328+
temp_db = instance.database(temp_db_id)
329+
temp_db.spanner_api
330+
331+
# Endpoint is cached - resolved endpoint used.
332+
self.assertIn(Config.INSTANCE.name, client._endpoint_cache)
333+
self.assertEqual(
334+
client._endpoint_cache[Config.INSTANCE.name], resolved_endpoint
335+
)
336+
337+
# Endpoint is cached at a class level.
338+
self.assertIn(Config.INSTANCE.name, Config.CLIENT._endpoint_cache)
339+
self.assertEqual(
340+
Config.CLIENT._endpoint_cache[Config.INSTANCE.name], resolved_endpoint
341+
)
342+
285343
def test_list_databases(self):
286344
# Since `Config.INSTANCE` is newly created in `setUpModule`, the
287345
# database created in `setUpClass` here will be the only one.

spanner/tests/unit/test_client.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def _constructor_test_helper(
5757
user_agent=None,
5858
client_options=None,
5959
):
60+
import google.api_core.client_options
6061
from google.cloud.spanner_v1 import client as MUT
6162

6263
kwargs = {}
@@ -66,6 +67,14 @@ def _constructor_test_helper(
6667
else:
6768
expected_client_info = MUT._CLIENT_INFO
6869

70+
kwargs["client_options"] = client_options
71+
if type(client_options) == dict:
72+
expected_client_options = google.api_core.client_options.from_dict(
73+
client_options
74+
)
75+
else:
76+
expected_client_options = client_options
77+
6978
client = self._make_one(
7079
project=self.PROJECT, credentials=creds, user_agent=user_agent, **kwargs
7180
)
@@ -80,7 +89,14 @@ def _constructor_test_helper(
8089
self.assertEqual(client.project, self.PROJECT)
8190
self.assertIs(client._client_info, expected_client_info)
8291
self.assertEqual(client.user_agent, user_agent)
83-
self.assertEqual(client._client_options, client_options)
92+
if expected_client_options is not None:
93+
self.assertIsInstance(
94+
client._client_options, google.api_core.client_options.ClientOptions
95+
)
96+
self.assertEqual(
97+
client._client_options.api_endpoint,
98+
expected_client_options.api_endpoint,
99+
)
84100

85101
def test_constructor_default_scopes(self):
86102
from google.cloud.spanner_v1 import client as MUT
@@ -127,6 +143,27 @@ def test_constructor_credentials_wo_create_scoped(self):
127143
expected_scopes = None
128144
self._constructor_test_helper(expected_scopes, creds)
129145

146+
def test_constructor_custom_client_options_obj(self):
147+
from google.api_core.client_options import ClientOptions
148+
from google.cloud.spanner_v1 import client as MUT
149+
150+
expected_scopes = (MUT.SPANNER_ADMIN_SCOPE,)
151+
creds = _make_credentials()
152+
self._constructor_test_helper(
153+
expected_scopes,
154+
creds,
155+
client_options=ClientOptions(api_endpoint="endpoint"),
156+
)
157+
158+
def test_constructor_custom_client_options_dict(self):
159+
from google.cloud.spanner_v1 import client as MUT
160+
161+
expected_scopes = (MUT.SPANNER_ADMIN_SCOPE,)
162+
creds = _make_credentials()
163+
self._constructor_test_helper(
164+
expected_scopes, creds, client_options={"api_endpoint": "endpoint"}
165+
)
166+
130167
def test_instance_admin_api(self):
131168
from google.cloud.spanner_v1.client import SPANNER_ADMIN_SCOPE
132169

0 commit comments

Comments
 (0)