13
13
# limitations under the License.
14
14
15
15
import typing
16
- import warnings
17
16
18
17
from google .cloud import bigquery
18
+ from google .cloud .bigquery_storage import types as bqstorage_types
19
+ import pandas
20
+ import pandas .testing
19
21
import pytest
20
22
21
23
import bigframes
@@ -41,6 +43,7 @@ def _assert_bq_execution_location(
41
43
42
44
assert typing .cast (bigquery .QueryJob , df .query_job ).location == expected_location
43
45
46
+ # Ensure operation involving BQ client suceeds
44
47
result = (
45
48
df [["name" , "number" ]]
46
49
.groupby ("name" )
@@ -53,6 +56,35 @@ def _assert_bq_execution_location(
53
56
typing .cast (bigquery .QueryJob , result .query_job ).location == expected_location
54
57
)
55
58
59
+ expected_result = pandas .DataFrame (
60
+ {"number" : [444 , 222 ]}, index = pandas .Index (["aaa" , "bbb" ], name = "name" )
61
+ )
62
+ pandas .testing .assert_frame_equal (
63
+ expected_result , result .to_pandas (), check_dtype = False , check_index_type = False
64
+ )
65
+
66
+ # Ensure BQ Storage Read client operation succceeds
67
+ table = result .query_job .destination
68
+ requested_session = bqstorage_types .ReadSession ( # type: ignore[attr-defined]
69
+ table = f"projects/{ table .project } /datasets/{ table .dataset_id } /tables/{ table .table_id } " ,
70
+ data_format = bqstorage_types .DataFormat .ARROW , # type: ignore[attr-defined]
71
+ )
72
+ read_session = session .bqstoragereadclient .create_read_session (
73
+ parent = f"projects/{ table .project } " ,
74
+ read_session = requested_session ,
75
+ max_stream_count = 1 ,
76
+ )
77
+ reader = session .bqstoragereadclient .read_rows (read_session .streams [0 ].name )
78
+ frames = []
79
+ for message in reader .rows ().pages :
80
+ frames .append (message .to_dataframe ())
81
+ read_dataframe = pandas .concat (frames )
82
+ # normalize before comparing since we lost some of the bigframes column
83
+ # naming abtractions in the direct read of the destination table
84
+ read_dataframe = read_dataframe .set_index ("name" )
85
+ read_dataframe .columns = result .columns
86
+ pandas .testing .assert_frame_equal (expected_result , read_dataframe )
87
+
56
88
57
89
def test_bq_location_default ():
58
90
session = bigframes .Session ()
@@ -119,22 +151,14 @@ def test_bq_location_non_canonical(set_location, resolved_location):
119
151
sorted (bigframes .constants .REP_ENABLED_BIGQUERY_LOCATIONS ),
120
152
)
121
153
def test_bq_rep_endpoints (bigquery_location ):
122
- with warnings .catch_warnings (record = True ) as record :
123
- warnings .simplefilter ("always" )
124
- session = bigframes .Session (
125
- context = bigframes .BigQueryOptions (
126
- location = bigquery_location , use_regional_endpoints = True
127
- )
128
- )
129
- assert (
130
- len ([warn for warn in record if isinstance (warn .message , FutureWarning )])
131
- == 0
154
+ session = bigframes .Session (
155
+ context = bigframes .BigQueryOptions (
156
+ location = bigquery_location , use_regional_endpoints = True
132
157
)
158
+ )
133
159
134
- # Verify that location and endpoints are correctly set for the BigQuery API
160
+ # Verify that location and endpoint is correctly set for the BigQuery API
135
161
# client
136
- # TODO(shobs): Figure out if the same can be verified for the other API
137
- # clients.
138
162
assert session .bqclient .location == bigquery_location
139
163
assert (
140
164
session .bqclient ._connection .API_BASE_URL
@@ -143,36 +167,52 @@ def test_bq_rep_endpoints(bigquery_location):
143
167
)
144
168
)
145
169
170
+ # Verify that endpoint is correctly set for the BigQuery Storage API client
171
+ # TODO(shobs): Figure out if we can verify that location is set in the
172
+ # BigQuery Storage API client.
173
+ assert (
174
+ session .bqstoragereadclient .api_endpoint
175
+ == f"bigquerystorage.{ bigquery_location } .rep.googleapis.com"
176
+ )
177
+
146
178
# assert that bigframes session honors the location
147
179
_assert_bq_execution_location (session )
148
180
149
181
182
+ def test_clients_provider_no_location ():
183
+ with pytest .raises (ValueError , match = "Must set location to use regional endpoints" ):
184
+ bigframes .session .clients .ClientsProvider (use_regional_endpoints = True )
185
+
186
+
150
187
@pytest .mark .parametrize (
151
188
"bigquery_location" ,
152
189
# Sort the set to avoid nondeterminism.
153
- sorted (bigframes .constants .LEP_ENABLED_BIGQUERY_LOCATIONS ),
190
+ sorted (bigframes .constants .REP_NOT_ENABLED_BIGQUERY_LOCATIONS ),
154
191
)
155
- def test_bq_lep_endpoints (bigquery_location ):
156
- # We are not testing BigFrames Session for LEP endpoints because it involves
157
- # query execution using the endpoint, which requires the project to be
158
- # allowlisted for LEP access. We could hardcode one project which is
159
- # allowlisted but then not every open source developer will have access to
160
- # that. Let's rely on just creating the clients for LEP.
161
- with pytest .warns (FutureWarning ) as record :
162
- clients_provider = bigframes .session .clients .ClientsProvider (
192
+ def test_clients_provider_use_regional_endpoints_non_rep_locations (bigquery_location ):
193
+ with pytest .raises (
194
+ ValueError ,
195
+ match = f"not .*available in the location { bigquery_location } " ,
196
+ ):
197
+ bigframes .session .clients .ClientsProvider (
163
198
location = bigquery_location , use_regional_endpoints = True
164
199
)
165
- assert len (record ) == 1
166
- assert bigquery_location in typing .cast (Warning , record [0 ].message ).args [0 ]
167
200
168
- # Verify that location and endpoints are correctly set for the BigQuery API
169
- # client
170
- # TODO(shobs): Figure out if the same can be verified for the other API
171
- # clients.
172
- assert clients_provider .bqclient .location == bigquery_location
173
- assert (
174
- clients_provider .bqclient ._connection .API_BASE_URL
175
- == "https://{location}-bigquery.googleapis.com" .format (
176
- location = bigquery_location
201
+
202
+ @pytest .mark .parametrize (
203
+ "bigquery_location" ,
204
+ # Sort the set to avoid nondeterminism.
205
+ sorted (bigframes .constants .REP_NOT_ENABLED_BIGQUERY_LOCATIONS ),
206
+ )
207
+ def test_session_init_fails_to_use_regional_endpoints_non_rep_endpoints (
208
+ bigquery_location ,
209
+ ):
210
+ with pytest .raises (
211
+ ValueError ,
212
+ match = f"not .*available in the location { bigquery_location } " ,
213
+ ):
214
+ bigframes .Session (
215
+ context = bigframes .BigQueryOptions (
216
+ location = bigquery_location , use_regional_endpoints = True
217
+ )
177
218
)
178
- )
0 commit comments