Skip to content

Commit 9711b83

Browse files
feat: Use session temp tables for all ephemeral storage (#1569)
* feat: Use session temp tables for all ephemeral storage * fix issues * fallback to anon dataset * fix test_clean_up_via_context_manager * fix flaky test_bq_session_create_temp_table_clustered * Update bigframes/session/__init__.py --------- Co-authored-by: Tim Sweña (Swast) <[email protected]>
1 parent 7ad2f23 commit 9711b83

13 files changed

+128
-133
lines changed

bigframes/blob/_functions.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def _output_bq_type(self):
6969
def _create_udf(self):
7070
"""Create Python UDF in BQ. Return name of the UDF."""
7171
udf_name = str(
72-
self._session._loader._storage_manager.generate_unique_resource_id()
72+
self._session._anon_dataset_manager.generate_unique_resource_id()
7373
)
7474

7575
func_body = inspect.getsource(self._func)

bigframes/dataframe.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3775,7 +3775,7 @@ def to_gbq(
37753775

37763776
# The client code owns this table reference now
37773777
temp_table_ref = (
3778-
self._session._temp_storage_manager.generate_unique_resource_id()
3778+
self._session._anon_dataset_manager.generate_unique_resource_id()
37793779
)
37803780
destination_table = f"{temp_table_ref.project}.{temp_table_ref.dataset_id}.{temp_table_ref.table_id}"
37813781

bigframes/session/__init__.py

+26-27
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,14 @@
7070
import bigframes.dtypes
7171
import bigframes.functions._function_session as bff_session
7272
import bigframes.functions.function as bff
73+
from bigframes.session import bigquery_session
7374
import bigframes.session._io.bigquery as bf_io_bigquery
75+
import bigframes.session.anonymous_dataset
7476
import bigframes.session.clients
7577
import bigframes.session.executor
7678
import bigframes.session.loader
7779
import bigframes.session.metrics
7880
import bigframes.session.planner
79-
import bigframes.session.temp_storage
8081
import bigframes.session.validation
8182

8283
# Avoid circular imports.
@@ -247,14 +248,26 @@ def __init__(
247248

248249
self._metrics = bigframes.session.metrics.ExecutionMetrics()
249250
self._function_session = bff_session.FunctionSession()
250-
self._temp_storage_manager = (
251-
bigframes.session.temp_storage.AnonymousDatasetManager(
251+
self._anon_dataset_manager = (
252+
bigframes.session.anonymous_dataset.AnonymousDatasetManager(
252253
self._clients_provider.bqclient,
253254
location=self._location,
254255
session_id=self._session_id,
255256
kms_key=self._bq_kms_key_name,
256257
)
257258
)
259+
# Session temp tables don't support specifying kms key, so use anon dataset if kms key specified
260+
self._session_resource_manager = (
261+
bigquery_session.SessionResourceManager(
262+
self.bqclient,
263+
self._location,
264+
)
265+
if (self._bq_kms_key_name is None)
266+
else None
267+
)
268+
self._temp_storage_manager = (
269+
self._session_resource_manager or self._anon_dataset_manager
270+
)
258271
self._executor: bigframes.session.executor.Executor = (
259272
bigframes.session.executor.BigQueryCachingExecutor(
260273
bqclient=self._clients_provider.bqclient,
@@ -375,7 +388,7 @@ def _allows_ambiguity(self) -> bool:
375388

376389
@property
377390
def _anonymous_dataset(self):
378-
return self._temp_storage_manager.dataset
391+
return self._anon_dataset_manager.dataset
379392

380393
def __hash__(self):
381394
# Stable hash needed to use in expression tree
@@ -388,9 +401,11 @@ def close(self):
388401

389402
# Protect against failure when the Session is a fake for testing or
390403
# failed to initialize.
391-
temp_storage_manager = getattr(self, "_temp_storage_manager", None)
392-
if temp_storage_manager:
393-
self._temp_storage_manager.clean_up_tables()
404+
if anon_dataset_manager := getattr(self, "_anon_dataset_manager", None):
405+
anon_dataset_manager.close()
406+
407+
if session_resource_manager := getattr(self, "_session_resource_manager", None):
408+
session_resource_manager.close()
394409

395410
remote_function_session = getattr(self, "_function_session", None)
396411
if remote_function_session:
@@ -906,8 +921,6 @@ def read_csv(
906921
engine=engine,
907922
write_engine=write_engine,
908923
)
909-
table = self._temp_storage_manager.allocate_temp_table()
910-
911924
if engine is not None and engine == "bigquery":
912925
if any(param is not None for param in (dtype, names)):
913926
not_supported = ("dtype", "names")
@@ -967,9 +980,7 @@ def read_csv(
967980
)
968981

969982
job_config = bigquery.LoadJobConfig()
970-
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
971983
job_config.source_format = bigquery.SourceFormat.CSV
972-
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
973984
job_config.autodetect = True
974985
job_config.field_delimiter = sep
975986
job_config.encoding = encoding
@@ -983,9 +994,8 @@ def read_csv(
983994
elif header > 0:
984995
job_config.skip_leading_rows = header
985996

986-
return self._loader._read_bigquery_load_job(
997+
return self._loader.read_bigquery_load_job(
987998
filepath_or_buffer,
988-
table,
989999
job_config=job_config,
9901000
index_col=index_col,
9911001
columns=columns,
@@ -1052,18 +1062,12 @@ def read_parquet(
10521062
engine=engine,
10531063
write_engine=write_engine,
10541064
)
1055-
table = self._temp_storage_manager.allocate_temp_table()
1056-
10571065
if engine == "bigquery":
10581066
job_config = bigquery.LoadJobConfig()
1059-
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
10601067
job_config.source_format = bigquery.SourceFormat.PARQUET
1061-
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
10621068
job_config.labels = {"bigframes-api": "read_parquet"}
10631069

1064-
return self._loader._read_bigquery_load_job(
1065-
path, table, job_config=job_config
1066-
)
1070+
return self._loader.read_bigquery_load_job(path, job_config=job_config)
10671071
else:
10681072
if "*" in path:
10691073
raise ValueError(
@@ -1106,8 +1110,6 @@ def read_json(
11061110
engine=engine,
11071111
write_engine=write_engine,
11081112
)
1109-
table = self._temp_storage_manager.allocate_temp_table()
1110-
11111113
if engine == "bigquery":
11121114

11131115
if dtype is not None:
@@ -1131,16 +1133,13 @@ def read_json(
11311133
)
11321134

11331135
job_config = bigquery.LoadJobConfig()
1134-
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
11351136
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
1136-
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
11371137
job_config.autodetect = True
11381138
job_config.encoding = encoding
11391139
job_config.labels = {"bigframes-api": "read_json"}
11401140

1141-
return self._loader._read_bigquery_load_job(
1141+
return self._loader.read_bigquery_load_job(
11421142
path_or_buf,
1143-
table,
11441143
job_config=job_config,
11451144
)
11461145
else:
@@ -1713,7 +1712,7 @@ def _start_query_ml_ddl(
17131712

17141713
def _create_object_table(self, path: str, connection: str) -> str:
17151714
"""Create a random id Object Table from the input path and connection."""
1716-
table = str(self._loader._storage_manager.generate_unique_resource_id())
1715+
table = str(self._anon_dataset_manager.generate_unique_resource_id())
17171716

17181717
import textwrap
17191718

bigframes/session/temp_storage.py bigframes/session/anonymous_dataset.py

+13-7
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
import google.cloud.bigquery as bigquery
2020

21-
import bigframes.constants as constants
21+
from bigframes import constants
22+
from bigframes.session import temporary_storage
2223
import bigframes.session._io.bigquery as bf_io_bigquery
2324

2425
_TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}"
2526

2627

27-
class AnonymousDatasetManager:
28+
class AnonymousDatasetManager(temporary_storage.TemporaryStorageManager):
2829
"""
2930
Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session.
3031
"""
@@ -38,19 +39,23 @@ def __init__(
3839
kms_key: Optional[str] = None
3940
):
4041
self.bqclient = bqclient
41-
self.location = location
42+
self._location = location
4243
self.dataset = bf_io_bigquery.create_bq_dataset_reference(
4344
self.bqclient,
44-
location=self.location,
45+
location=self._location,
4546
api_name="session-__init__",
4647
)
4748

4849
self.session_id = session_id
4950
self._table_ids: List[bigquery.TableReference] = []
5051
self._kms_key = kms_key
5152

52-
def allocate_and_create_temp_table(
53-
self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str]
53+
@property
54+
def location(self):
55+
return self._location
56+
57+
def create_temp_table(
58+
self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] = []
5459
) -> bigquery.TableReference:
5560
"""
5661
Allocates and and creates a table in the anonymous dataset.
@@ -99,7 +104,8 @@ def generate_unique_resource_id(self) -> bigquery.TableReference:
99104
)
100105
return self.dataset.table(table_id)
101106

102-
def clean_up_tables(self):
107+
def close(self):
103108
"""Delete tables that were created with this session's session_id."""
104109
for table_ref in self._table_ids:
105110
self.bqclient.delete_table(table_ref, not_found_ok=True)
111+
self._table_ids.clear()

bigframes/session/bigquery_session.py

+9-12
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import google.cloud.bigquery as bigquery
2424

2525
from bigframes.core.compile import googlesql
26+
from bigframes.session import temporary_storage
2627

2728
KEEPALIVE_QUERY_TIMEOUT_SECONDS = 5.0
2829

@@ -32,21 +33,22 @@
3233
logger = logging.getLogger(__name__)
3334

3435

35-
class SessionResourceManager:
36+
class SessionResourceManager(temporary_storage.TemporaryStorageManager):
3637
"""
3738
Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session.
3839
"""
3940

40-
def __init__(
41-
self, bqclient: bigquery.Client, location: str, *, kms_key: Optional[str] = None
42-
):
41+
def __init__(self, bqclient: bigquery.Client, location: str):
4342
self.bqclient = bqclient
44-
self.location = location
45-
self._kms_key = kms_key
43+
self._location = location
4644
self._session_id: Optional[str] = None
4745
self._sessiondaemon: Optional[RecurringTaskDaemon] = None
4846
self._session_lock = threading.RLock()
4947

48+
@property
49+
def location(self):
50+
return self._location
51+
5052
def create_temp_table(
5153
self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] = []
5254
) -> bigquery.TableReference:
@@ -56,17 +58,13 @@ def create_temp_table(
5658
with self._session_lock:
5759
table_ref = bigquery.TableReference(
5860
bigquery.DatasetReference(self.bqclient.project, "_SESSION"),
59-
uuid.uuid4().hex,
61+
f"bqdf_{uuid.uuid4()}",
6062
)
6163
job_config = bigquery.QueryJobConfig(
6264
connection_properties=[
6365
bigquery.ConnectionProperty("session_id", self._get_session_id())
6466
]
6567
)
66-
if self._kms_key:
67-
job_config.destination_encryption_configuration = (
68-
bigquery.EncryptionConfiguration(kms_key_name=self._kms_key)
69-
)
7068

7169
ibis_schema = ibis_bq.BigQuerySchema.to_ibis(list(schema))
7270

@@ -87,7 +85,6 @@ def create_temp_table(
8785
ddl = f"CREATE TEMP TABLE `_SESSION`.{googlesql.identifier(table_ref.table_id)} ({fields_string}){cluster_string}"
8886

8987
job = self.bqclient.query(ddl, job_config=job_config)
90-
9188
job.result()
9289
# return the fully qualified table, so it can be used outside of the session
9390
return job.destination

bigframes/session/executor.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
import bigframes.session._io.bigquery as bq_io
5454
import bigframes.session.metrics
5555
import bigframes.session.planner
56-
import bigframes.session.temp_storage
56+
import bigframes.session.temporary_storage
5757

5858
# Max complexity that should be executed as a single query
5959
QUERY_COMPLEXITY_LIMIT = 1e7
@@ -195,7 +195,7 @@ class BigQueryCachingExecutor(Executor):
195195
def __init__(
196196
self,
197197
bqclient: bigquery.Client,
198-
storage_manager: bigframes.session.temp_storage.AnonymousDatasetManager,
198+
storage_manager: bigframes.session.temporary_storage.TemporaryStorageManager,
199199
bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient,
200200
*,
201201
strictly_ordered: bool = True,
@@ -221,7 +221,7 @@ def to_sql(
221221
enable_cache: bool = True,
222222
) -> str:
223223
if offset_column:
224-
array_value, internal_offset_col = array_value.promote_offsets()
224+
array_value, _ = array_value.promote_offsets()
225225
node = (
226226
self.replace_cached_subtrees(array_value.node)
227227
if enable_cache
@@ -248,7 +248,7 @@ def execute(
248248
job_config = bigquery.QueryJobConfig()
249249
# Use explicit destination to avoid 10GB limit of temporary table
250250
if use_explicit_destination:
251-
destination_table = self.storage_manager.allocate_and_create_temp_table(
251+
destination_table = self.storage_manager.create_temp_table(
252252
array_value.schema.to_bigquery(), cluster_cols=[]
253253
)
254254
job_config.destination = destination_table
@@ -392,7 +392,7 @@ def peek(
392392
job_config = bigquery.QueryJobConfig()
393393
# Use explicit destination to avoid 10GB limit of temporary table
394394
if use_explicit_destination:
395-
destination_table = self.storage_manager.allocate_and_create_temp_table(
395+
destination_table = self.storage_manager.create_temp_table(
396396
array_value.schema.to_bigquery(), cluster_cols=[]
397397
)
398398
job_config.destination = destination_table
@@ -645,9 +645,7 @@ def _sql_as_cached_temp_table(
645645
cluster_cols: Sequence[str],
646646
) -> bigquery.TableReference:
647647
assert len(cluster_cols) <= _MAX_CLUSTER_COLUMNS
648-
temp_table = self.storage_manager.allocate_and_create_temp_table(
649-
schema, cluster_cols
650-
)
648+
temp_table = self.storage_manager.create_temp_table(schema, cluster_cols)
651649

652650
# TODO: Get default job config settings
653651
job_config = cast(

0 commit comments

Comments
 (0)