-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathbigquery_options.py
297 lines (240 loc) · 11.1 KB
/
bigquery_options.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Options for BigQuery DataFrames."""
from __future__ import annotations
from typing import Literal, Optional
import warnings
import google.api_core.exceptions
import google.auth.credentials
import jellyfish
import bigframes.constants
import bigframes.enums
import bigframes.exceptions
SESSION_STARTED_MESSAGE = (
"Cannot change '{attribute}' once a session has started. "
"Call bigframes.pandas.close_session() first, if you are using the bigframes.pandas API."
)
UNKNOWN_LOCATION_MESSAGE = "The location '{location}' is set to an unknown value. Did you mean '{possibility}'?"
def _validate_location(value: Optional[str]):
if value is None:
return
if value not in bigframes.constants.ALL_BIGQUERY_LOCATIONS:
location = str(value)
possibility = min(
bigframes.constants.ALL_BIGQUERY_LOCATIONS,
key=lambda item: jellyfish.levenshtein_distance(location, item),
)
warnings.warn(
UNKNOWN_LOCATION_MESSAGE.format(location=location, possibility=possibility),
# There are many layers before we get to (possibly) the user's code:
# -> bpd.options.bigquery.location = "us-central-1"
# -> location.setter
# -> _validate_location
stacklevel=3,
category=bigframes.exceptions.UnknownLocationWarning,
)
def _validate_ordering_mode(value: str) -> bigframes.enums.OrderingMode:
if value.casefold() == bigframes.enums.OrderingMode.STRICT.value.casefold():
return bigframes.enums.OrderingMode.STRICT
if value.casefold() == bigframes.enums.OrderingMode.PARTIAL.value.casefold():
return bigframes.enums.OrderingMode.PARTIAL
raise ValueError("Ordering mode must be one of 'strict' or 'partial'.")
class BigQueryOptions:
"""Encapsulates configuration for working with a session."""
def __init__(
self,
credentials: Optional[google.auth.credentials.Credentials] = None,
project: Optional[str] = None,
location: Optional[str] = None,
bq_connection: Optional[str] = None,
use_regional_endpoints: bool = False,
application_name: Optional[str] = None,
kms_key_name: Optional[str] = None,
skip_bq_connection_check: bool = False,
*,
ordering_mode: Literal["strict", "partial"] = "strict",
):
self._credentials = credentials
self._project = project
self._location = location
self._bq_connection = bq_connection
self._use_regional_endpoints = use_regional_endpoints
self._application_name = application_name
self._kms_key_name = kms_key_name
self._skip_bq_connection_check = skip_bq_connection_check
self._session_started = False
# Determines the ordering strictness for the session.
self._ordering_mode = _validate_ordering_mode(ordering_mode)
@property
def application_name(self) -> Optional[str]:
"""The application name to amend to the user-agent sent to Google APIs.
The application name to amend to the user agent sent to Google APIs.
The recommended format is ``"application-name/major.minor.patch_version"``
or ``"(gpn:PartnerName;)"`` for official Google partners.
Returns:
None or str:
Application name as a string if exists; otherwise None.
"""
return self._application_name
@application_name.setter
def application_name(self, value: Optional[str]):
if self._session_started and self._application_name != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="application_name")
)
self._application_name = value
@property
def credentials(self) -> Optional[google.auth.credentials.Credentials]:
"""The OAuth2 credentials to use for this client.
Returns:
None or google.auth.credentials.Credentials:
google.auth.credentials.Credentials if exists; otherwise None.
"""
return self._credentials
@credentials.setter
def credentials(self, value: Optional[google.auth.credentials.Credentials]):
if self._session_started and self._credentials is not value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="credentials"))
self._credentials = value
@property
def location(self) -> Optional[str]:
"""Default location for job, datasets, and tables.
For more information, see https://cloud.google.com/bigquery/docs/locations BigQuery locations.
Returns:
None or str:
Default location as a string; otherwise None.
"""
return self._location
@location.setter
def location(self, value: Optional[str]):
if self._session_started and self._location != value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="location"))
_validate_location(value)
self._location = value
@property
def project(self) -> Optional[str]:
"""Google Cloud project ID to use for billing and as the default project.
Returns:
None or str:
Google Cloud project ID as a string; otherwise None.
"""
return self._project
@project.setter
def project(self, value: Optional[str]):
if self._session_started and self._project != value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="project"))
self._project = value
@property
def bq_connection(self) -> Optional[str]:
"""Name of the BigQuery connection to use in the form
<PROJECT_NUMBER/PROJECT_ID>.<LOCATION>.<CONNECTION_ID>.
You either need to create the connection in a location of your choice, or
you need the Project Admin IAM role to enable the service to create the
connection for you.
If this option isn't available, or the project or location isn't provided,
then the default connection project/location/connection_id is used in the session.
If this option isn't provided, or project or location aren't provided,
session will use its default project/location/connection_id as default connection.
Returns:
None or str:
Name of the BigQuery connection as a string; otherwise None.
"""
return self._bq_connection
@bq_connection.setter
def bq_connection(self, value: Optional[str]):
if self._session_started and self._bq_connection != value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="bq_connection"))
self._bq_connection = value
@property
def skip_bq_connection_check(self) -> bool:
"""Forcibly use the BigQuery connection.
Setting this flag to True would avoid creating the BigQuery connection
and checking or setting IAM permissions on it. So if the BigQuery
connection (default or user-provided) does not exist, or it does not have
necessary permissions set up to support BigQuery DataFrames operations,
then a runtime error will be reported.
Returns:
bool:
A boolean value, where True indicates a BigQuery connection is
not created or the connection does not have necessary
permissions set up; otherwise False.
"""
return self._skip_bq_connection_check
@skip_bq_connection_check.setter
def skip_bq_connection_check(self, value: bool):
if self._session_started and self._skip_bq_connection_check != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="skip_bq_connection_check")
)
self._skip_bq_connection_check = value
@property
def use_regional_endpoints(self) -> bool:
"""Flag to connect to regional API endpoints.
.. deprecated:: 0.13.0
Use of regional endpoints is a feature in Preview and
available only in selected regions and projects.
Requires that ``location`` is set. For example, to connect to
asia-northeast1-bigquery.googleapis.com, specify
``location='asia-northeast1'`` and ``use_regional_endpoints=True``.
Returns:
bool:
A boolean value, where True indicates that a location is set;
otherwise False.
"""
return self._use_regional_endpoints
@use_regional_endpoints.setter
def use_regional_endpoints(self, value: bool):
if self._session_started and self._use_regional_endpoints != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="use_regional_endpoints")
)
if value:
warnings.warn(
"Use of regional endpoints is a feature in preview and "
"available only in selected regions and projects. "
)
self._use_regional_endpoints = value
@property
def kms_key_name(self) -> Optional[str]:
"""
Customer managed encryption key used to control encryption of the
data-at-rest in BigQuery. This is of the format
projects/PROJECT_ID/locations/LOCATION/keyRings/KEYRING/cryptoKeys/KEY.
For more information, see https://cloud.google.com/bigquery/docs/customer-managed-encryption
Customer-managed Cloud KMS keys
Make sure the project used for Bigquery DataFrames has the
Cloud KMS CryptoKey Encrypter/Decrypter IAM role in the key's project.
For more information, see https://cloud.google.com/bigquery/docs/customer-managed-encryption#assign_role
Assign the Encrypter/Decrypter.
Returns:
None or str:
Name of the customer managed encryption key as a string; otherwise None.
"""
return self._kms_key_name
@kms_key_name.setter
def kms_key_name(self, value: str):
if self._session_started and self._kms_key_name != value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="kms_key_name"))
self._kms_key_name = value
@property
def ordering_mode(self) -> Literal["strict", "partial"]:
"""Controls whether total row order is always maintained for DataFrame/Series.
Returns:
Literal:
A literal string value of either strict or partial ordering mode.
"""
return self._ordering_mode.value
@ordering_mode.setter
def ordering_mode(self, ordering_mode: Literal["strict", "partial"]) -> None:
self._ordering_mode = _validate_ordering_mode(ordering_mode)