Skip to content

Commit 84ad11d

Browse files
shollymantswast
andauthored
feat: add remote function options to routines (#1558)
* feat: add remote function options This PR adds support for defining routines as remote UDFs. * basic integration test * augment tests * rename prop * augment tests * more testing * cover shenanigans --------- Co-authored-by: Tim Swast <[email protected]>
1 parent 9ea2e21 commit 84ad11d

File tree

5 files changed

+342
-0
lines changed

5 files changed

+342
-0
lines changed

google/cloud/bigquery/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
from google.cloud.bigquery.routine import RoutineArgument
9494
from google.cloud.bigquery.routine import RoutineReference
9595
from google.cloud.bigquery.routine import RoutineType
96+
from google.cloud.bigquery.routine import RemoteFunctionOptions
9697
from google.cloud.bigquery.schema import PolicyTagList
9798
from google.cloud.bigquery.schema import SchemaField
9899
from google.cloud.bigquery.standard_sql import StandardSqlDataType
@@ -154,6 +155,7 @@
154155
"Routine",
155156
"RoutineArgument",
156157
"RoutineReference",
158+
"RemoteFunctionOptions",
157159
# Shared helpers
158160
"SchemaField",
159161
"PolicyTagList",

google/cloud/bigquery/routine/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from google.cloud.bigquery.routine.routine import RoutineArgument
2121
from google.cloud.bigquery.routine.routine import RoutineReference
2222
from google.cloud.bigquery.routine.routine import RoutineType
23+
from google.cloud.bigquery.routine.routine import RemoteFunctionOptions
2324

2425

2526
__all__ = (
@@ -28,4 +29,5 @@
2829
"RoutineArgument",
2930
"RoutineReference",
3031
"RoutineType",
32+
"RemoteFunctionOptions",
3133
)

google/cloud/bigquery/routine/routine.py

+153
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class Routine(object):
6767
"type_": "routineType",
6868
"description": "description",
6969
"determinism_level": "determinismLevel",
70+
"remote_function_options": "remoteFunctionOptions",
7071
}
7172

7273
def __init__(self, routine_ref, **kwargs) -> None:
@@ -297,6 +298,37 @@ def determinism_level(self):
297298
def determinism_level(self, value):
298299
self._properties[self._PROPERTY_TO_API_FIELD["determinism_level"]] = value
299300

301+
@property
302+
def remote_function_options(self):
303+
"""Optional[google.cloud.bigquery.routine.RemoteFunctionOptions]: Configures remote function
304+
options for a routine.
305+
306+
Raises:
307+
ValueError:
308+
If the value is not
309+
:class:`~google.cloud.bigquery.routine.RemoteFunctionOptions` or
310+
:data:`None`.
311+
"""
312+
prop = self._properties.get(
313+
self._PROPERTY_TO_API_FIELD["remote_function_options"]
314+
)
315+
if prop is not None:
316+
return RemoteFunctionOptions.from_api_repr(prop)
317+
318+
@remote_function_options.setter
319+
def remote_function_options(self, value):
320+
api_repr = value
321+
if isinstance(value, RemoteFunctionOptions):
322+
api_repr = value.to_api_repr()
323+
elif value is not None:
324+
raise ValueError(
325+
"value must be google.cloud.bigquery.routine.RemoteFunctionOptions "
326+
"or None"
327+
)
328+
self._properties[
329+
self._PROPERTY_TO_API_FIELD["remote_function_options"]
330+
] = api_repr
331+
300332
@classmethod
301333
def from_api_repr(cls, resource: dict) -> "Routine":
302334
"""Factory: construct a routine given its API representation.
@@ -563,3 +595,124 @@ def __str__(self):
563595
This is a fully-qualified ID, including the project ID and dataset ID.
564596
"""
565597
return "{}.{}.{}".format(self.project, self.dataset_id, self.routine_id)
598+
599+
600+
class RemoteFunctionOptions(object):
601+
"""Configuration options for controlling remote BigQuery functions."""
602+
603+
_PROPERTY_TO_API_FIELD = {
604+
"endpoint": "endpoint",
605+
"connection": "connection",
606+
"max_batching_rows": "maxBatchingRows",
607+
"user_defined_context": "userDefinedContext",
608+
}
609+
610+
def __init__(
611+
self,
612+
endpoint=None,
613+
connection=None,
614+
max_batching_rows=None,
615+
user_defined_context=None,
616+
_properties=None,
617+
) -> None:
618+
if _properties is None:
619+
_properties = {}
620+
self._properties = _properties
621+
622+
if endpoint is not None:
623+
self.endpoint = endpoint
624+
if connection is not None:
625+
self.connection = connection
626+
if max_batching_rows is not None:
627+
self.max_batching_rows = max_batching_rows
628+
if user_defined_context is not None:
629+
self.user_defined_context = user_defined_context
630+
631+
@property
632+
def connection(self):
633+
"""string: Fully qualified name of the user-provided connection object which holds the authentication information to send requests to the remote service.
634+
635+
Format is "projects/{projectId}/locations/{locationId}/connections/{connectionId}"
636+
"""
637+
return _helpers._str_or_none(self._properties.get("connection"))
638+
639+
@connection.setter
640+
def connection(self, value):
641+
self._properties["connection"] = _helpers._str_or_none(value)
642+
643+
@property
644+
def endpoint(self):
645+
"""string: Endpoint of the user-provided remote service
646+
647+
Example: "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add"
648+
"""
649+
return _helpers._str_or_none(self._properties.get("endpoint"))
650+
651+
@endpoint.setter
652+
def endpoint(self, value):
653+
self._properties["endpoint"] = _helpers._str_or_none(value)
654+
655+
@property
656+
def max_batching_rows(self):
657+
"""int64: Max number of rows in each batch sent to the remote service.
658+
659+
If absent or if 0, BigQuery dynamically decides the number of rows in a batch.
660+
"""
661+
return _helpers._int_or_none(self._properties.get("maxBatchingRows"))
662+
663+
@max_batching_rows.setter
664+
def max_batching_rows(self, value):
665+
self._properties["maxBatchingRows"] = _helpers._str_or_none(value)
666+
667+
@property
668+
def user_defined_context(self):
669+
"""Dict[str, str]: User-defined context as a set of key/value pairs,
670+
which will be sent as function invocation context together with
671+
batched arguments in the requests to the remote service. The total
672+
number of bytes of keys and values must be less than 8KB.
673+
"""
674+
return self._properties.get("userDefinedContext")
675+
676+
@user_defined_context.setter
677+
def user_defined_context(self, value):
678+
if not isinstance(value, dict):
679+
raise ValueError("value must be dictionary")
680+
self._properties["userDefinedContext"] = value
681+
682+
@classmethod
683+
def from_api_repr(cls, resource: dict) -> "RemoteFunctionOptions":
684+
"""Factory: construct remote function options given its API representation.
685+
686+
Args:
687+
resource (Dict[str, object]): Resource, as returned from the API.
688+
689+
Returns:
690+
google.cloud.bigquery.routine.RemoteFunctionOptions:
691+
Python object, as parsed from ``resource``.
692+
"""
693+
ref = cls()
694+
ref._properties = resource
695+
return ref
696+
697+
def to_api_repr(self) -> dict:
698+
"""Construct the API resource representation of this RemoteFunctionOptions.
699+
700+
Returns:
701+
Dict[str, object]: Remote function options represented as an API resource.
702+
"""
703+
return self._properties
704+
705+
def __eq__(self, other):
706+
if not isinstance(other, RemoteFunctionOptions):
707+
return NotImplemented
708+
return self._properties == other._properties
709+
710+
def __ne__(self, other):
711+
return not self == other
712+
713+
def __repr__(self):
714+
all_properties = [
715+
"{}={}".format(property_name, repr(getattr(self, property_name)))
716+
for property_name in sorted(self._PROPERTY_TO_API_FIELD)
717+
]
718+
return "RemoteFunctionOptions({})".format(", ".join(all_properties))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright 2023 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# https://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import pytest
18+
19+
ENDPOINT = "https://some.endpoint"
20+
CONNECTION = "connection_string"
21+
MAX_BATCHING_ROWS = 50
22+
USER_DEFINED_CONTEXT = {
23+
"foo": "bar",
24+
}
25+
26+
27+
@pytest.fixture
28+
def target_class():
29+
from google.cloud.bigquery.routine import RemoteFunctionOptions
30+
31+
return RemoteFunctionOptions
32+
33+
34+
def test_ctor(target_class):
35+
36+
options = target_class(
37+
endpoint=ENDPOINT,
38+
connection=CONNECTION,
39+
max_batching_rows=MAX_BATCHING_ROWS,
40+
user_defined_context=USER_DEFINED_CONTEXT,
41+
)
42+
assert options.endpoint == ENDPOINT
43+
assert options.connection == CONNECTION
44+
assert options.max_batching_rows == MAX_BATCHING_ROWS
45+
assert options.user_defined_context == USER_DEFINED_CONTEXT
46+
47+
48+
def test_empty_ctor(target_class):
49+
options = target_class()
50+
assert options._properties == {}
51+
options = target_class(_properties=None)
52+
assert options._properties == {}
53+
options = target_class(_properties={})
54+
assert options._properties == {}
55+
56+
57+
def test_ctor_bad_context(target_class):
58+
with pytest.raises(ValueError, match="value must be dictionary"):
59+
target_class(user_defined_context=[1, 2, 3, 4])
60+
61+
62+
def test_from_api_repr(target_class):
63+
resource = {
64+
"endpoint": ENDPOINT,
65+
"connection": CONNECTION,
66+
"maxBatchingRows": MAX_BATCHING_ROWS,
67+
"userDefinedContext": USER_DEFINED_CONTEXT,
68+
"someRandomField": "someValue",
69+
}
70+
options = target_class.from_api_repr(resource)
71+
assert options.endpoint == ENDPOINT
72+
assert options.connection == CONNECTION
73+
assert options.max_batching_rows == MAX_BATCHING_ROWS
74+
assert options.user_defined_context == USER_DEFINED_CONTEXT
75+
assert options._properties["someRandomField"] == "someValue"
76+
77+
78+
def test_from_api_repr_w_minimal_resource(target_class):
79+
resource = {}
80+
options = target_class.from_api_repr(resource)
81+
assert options.endpoint is None
82+
assert options.connection is None
83+
assert options.max_batching_rows is None
84+
assert options.user_defined_context is None
85+
86+
87+
def test_from_api_repr_w_unknown_fields(target_class):
88+
resource = {"thisFieldIsNotInTheProto": "just ignore me"}
89+
options = target_class.from_api_repr(resource)
90+
assert options._properties is resource
91+
92+
93+
def test_eq(target_class):
94+
options = target_class(
95+
endpoint=ENDPOINT,
96+
connection=CONNECTION,
97+
max_batching_rows=MAX_BATCHING_ROWS,
98+
user_defined_context=USER_DEFINED_CONTEXT,
99+
)
100+
other_options = target_class(
101+
endpoint=ENDPOINT,
102+
connection=CONNECTION,
103+
max_batching_rows=MAX_BATCHING_ROWS,
104+
user_defined_context=USER_DEFINED_CONTEXT,
105+
)
106+
assert options == other_options
107+
assert not (options != other_options)
108+
109+
empty_options = target_class()
110+
assert not (options == empty_options)
111+
assert options != empty_options
112+
113+
notanarg = object()
114+
assert not (options == notanarg)
115+
assert options != notanarg
116+
117+
118+
def test_repr(target_class):
119+
options = target_class(
120+
endpoint=ENDPOINT,
121+
connection=CONNECTION,
122+
max_batching_rows=MAX_BATCHING_ROWS,
123+
user_defined_context=USER_DEFINED_CONTEXT,
124+
)
125+
actual_repr = repr(options)
126+
assert actual_repr == (
127+
"RemoteFunctionOptions(connection='connection_string', endpoint='https://some.endpoint', max_batching_rows=50, user_defined_context={'foo': 'bar'})"
128+
)

0 commit comments

Comments
 (0)