Skip to content

Commit 019cae5

Browse files
authored
refactor(backends): clean up resources produced by memtable (#10055)
1 parent 62c63d2 commit 019cae5

File tree

17 files changed

+138
-92
lines changed

17 files changed

+138
-92
lines changed

ibis/backends/__init__.py

+15
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import abc
44
import collections.abc
5+
import contextlib
56
import functools
67
import importlib.metadata
78
import keyword
@@ -1116,13 +1117,27 @@ def _register_in_memory_tables(self, expr: ir.Expr) -> None:
11161117
for memtable in expr.op().find(ops.InMemoryTable):
11171118
if not self._in_memory_table_exists(memtable.name):
11181119
self._register_in_memory_table(memtable)
1120+
weakref.finalize(
1121+
memtable, self._finalize_in_memory_table, memtable.name
1122+
)
11191123

11201124
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
11211125
if self.supports_in_memory_tables:
11221126
raise NotImplementedError(
11231127
f"{self.name} must implement `_register_in_memory_table` to support in-memory tables"
11241128
)
11251129

1130+
def _finalize_in_memory_table(self, name: str) -> None:
1131+
"""Wrap `_finalize_memtable` to suppress exceptions."""
1132+
with contextlib.suppress(Exception):
1133+
self._finalize_memtable(name)
1134+
1135+
def _finalize_memtable(self, name: str) -> None:
1136+
if self.supports_in_memory_tables:
1137+
raise NotImplementedError(
1138+
f"{self.name} must implement `_finalize_memtable` to support in-memory tables"
1139+
)
1140+
11261141
def _run_pre_execute_hooks(self, expr: ir.Expr) -> None:
11271142
"""Backend-specific hooks to run before an expression is executed."""
11281143
self._register_udfs(expr)

ibis/backends/bigquery/__init__.py

+11
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,17 @@ def _in_memory_table_exists(self, name: str) -> bool:
180180
else:
181181
return True
182182

183+
def _finalize_memtable(self, name: str) -> None:
184+
session_dataset = self._session_dataset
185+
table_id = sg.table(
186+
name,
187+
db=session_dataset.dataset_id,
188+
catalog=session_dataset.project,
189+
quoted=False,
190+
)
191+
drop_sql_stmt = sge.Drop(kind="TABLE", this=table_id, exists=True)
192+
self.raw_sql(drop_sql_stmt)
193+
183194
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
184195
session_dataset = self._session_dataset
185196

ibis/backends/duckdb/__init__.py

+5-8
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import os
88
import urllib
99
import warnings
10-
import weakref
1110
from operator import itemgetter
1211
from pathlib import Path
1312
from typing import TYPE_CHECKING, Any
@@ -160,12 +159,9 @@ def create_table(
160159
properties.append(sge.TemporaryProperty())
161160
catalog = "temp"
162161

163-
temp_memtable_view = None
164-
165162
if obj is not None:
166163
if not isinstance(obj, ir.Expr):
167164
table = ibis.memtable(obj)
168-
temp_memtable_view = table.op().name
169165
else:
170166
table = obj
171167

@@ -234,9 +230,6 @@ def create_table(
234230
).sql(self.name)
235231
)
236232

237-
if temp_memtable_view is not None:
238-
self.con.unregister(temp_memtable_view)
239-
240233
return self.table(name, database=(catalog, database))
241234

242235
def table(
@@ -1620,11 +1613,15 @@ def _in_memory_table_exists(self, name: str) -> bool:
16201613
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
16211614
self.con.register(op.name, op.data.to_pyarrow(op.schema))
16221615

1616+
def _finalize_memtable(self, name: str) -> None:
16231617
# if we don't aggressively unregister tables duckdb will keep a
16241618
# reference to every memtable ever registered, even if there's no
16251619
# way for a user to access the operation anymore, resulting in a
16261620
# memory leak
1627-
weakref.finalize(op, self.con.unregister, op.name)
1621+
#
1622+
# we can't use drop_table, because self.con.register creates a view, so
1623+
# use the corresponding unregister method
1624+
self.con.unregister(name)
16281625

16291626
def _register_udfs(self, expr: ir.Expr) -> None:
16301627
con = self.con

ibis/backends/exasol/__init__.py

+12-17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import atexit
43
import contextlib
54
import datetime
65
import re
@@ -42,7 +41,6 @@ class Backend(SQLBackend, CanCreateDatabase, CanCreateSchema):
4241
compiler = sc.exasol.compiler
4342
supports_temporary_tables = False
4443
supports_create_or_replace = False
45-
supports_in_memory_tables = False
4644
supports_python_udfs = False
4745

4846
@property
@@ -278,14 +276,15 @@ def process_item(item: Any):
278276
with self._safe_raw_sql(create_stmt_sql):
279277
if not df.empty:
280278
self.con.ext.insert_multi(name, rows)
281-
atexit.register(self._clean_up_tmp_table, ident)
282279

283-
def _clean_up_tmp_table(self, ident: sge.Identifier) -> None:
284-
with self._safe_raw_sql(
285-
sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True)
286-
):
280+
def _clean_up_tmp_table(self, name: str) -> None:
281+
ident = sg.to_identifier(name, quoted=self.compiler.quoted)
282+
sql = sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True)
283+
with self._safe_raw_sql(sql):
287284
pass
288285

286+
_finalize_memtable = _clean_up_tmp_table
287+
289288
def create_table(
290289
self,
291290
name: str,
@@ -334,11 +333,9 @@ def create_table(
334333

335334
quoted = self.compiler.quoted
336335

337-
temp_memtable_view = None
338336
if obj is not None:
339337
if not isinstance(obj, ir.Expr):
340338
table = ibis.memtable(obj)
341-
temp_memtable_view = table.op().name
342339
else:
343340
table = obj
344341

@@ -356,31 +353,29 @@ def create_table(
356353
if not schema:
357354
schema = table.schema()
358355

359-
table = sg.table(temp_name, catalog=database, quoted=quoted)
360-
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
356+
table_expr = sg.table(temp_name, catalog=database, quoted=quoted)
357+
target = sge.Schema(
358+
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
359+
)
361360

362361
create_stmt = sge.Create(kind="TABLE", this=target)
363362

364363
this = sg.table(name, catalog=database, quoted=quoted)
365364
with self._safe_raw_sql(create_stmt):
366365
if query is not None:
367366
self.con.execute(
368-
sge.Insert(this=table, expression=query).sql(self.name)
367+
sge.Insert(this=table_expr, expression=query).sql(self.name)
369368
)
370369

371370
if overwrite:
372371
self.con.execute(
373372
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
374373
)
375374
self.con.execute(
376-
f"RENAME TABLE {table.sql(self.name)} TO {this.sql(self.name)}"
375+
f"RENAME TABLE {table_expr.sql(self.name)} TO {this.sql(self.name)}"
377376
)
378377

379378
if schema is None:
380-
# Clean up temporary memtable if we've created one
381-
# for in-memory reads
382-
if temp_memtable_view is not None:
383-
self.drop_table(temp_memtable_view)
384379
return self.table(name, database=database)
385380

386381
# preserve the input schema if it was provided

ibis/backends/mssql/__init__.py

+8-11
Original file line numberDiff line numberDiff line change
@@ -625,11 +625,9 @@ def create_table(
625625
properties.append(sge.TemporaryProperty())
626626
catalog, db = None, None
627627

628-
temp_memtable_view = None
629628
if obj is not None:
630629
if not isinstance(obj, ir.Expr):
631630
table = ibis.memtable(obj)
632-
temp_memtable_view = table.op().name
633631
else:
634632
table = obj
635633

@@ -647,19 +645,22 @@ def create_table(
647645
if not schema:
648646
schema = table.schema()
649647

650-
table = sg.table(
651-
"#" * temp + temp_name, catalog=catalog, db=db, quoted=self.compiler.quoted
652-
)
648+
quoted = self.compiler.quoted
653649
raw_table = sg.table(temp_name, catalog=catalog, db=db, quoted=False)
654-
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
650+
target = sge.Schema(
651+
this=sg.table(
652+
"#" * temp + temp_name, catalog=catalog, db=db, quoted=quoted
653+
),
654+
expressions=schema.to_sqlglot(self.dialect),
655+
)
655656

656657
create_stmt = sge.Create(
657658
kind="TABLE",
658659
this=target,
659660
properties=sge.Properties(expressions=properties),
660661
)
661662

662-
this = sg.table(name, catalog=catalog, db=db, quoted=self.compiler.quoted)
663+
this = sg.table(name, catalog=catalog, db=db, quoted=quoted)
663664
raw_this = sg.table(name, catalog=catalog, db=db, quoted=False)
664665
with self._safe_ddl(create_stmt) as cur:
665666
if query is not None:
@@ -692,10 +693,6 @@ def create_table(
692693
db = "dbo"
693694

694695
if schema is None:
695-
# Clean up temporary memtable if we've created one
696-
# for in-memory reads
697-
if temp_memtable_view is not None:
698-
self.drop_table(temp_memtable_view)
699696
return self.table(name, database=(catalog, db))
700697

701698
# preserve the input schema if it was provided

ibis/backends/mysql/__init__.py

+15-4
Original file line numberDiff line numberDiff line change
@@ -425,8 +425,10 @@ def create_table(
425425
if not schema:
426426
schema = table.schema()
427427

428-
table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
429-
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
428+
table_expr = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
429+
target = sge.Schema(
430+
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
431+
)
430432

431433
create_stmt = sge.Create(
432434
kind="TABLE",
@@ -437,15 +439,17 @@ def create_table(
437439
this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
438440
with self._safe_raw_sql(create_stmt) as cur:
439441
if query is not None:
440-
insert_stmt = sge.Insert(this=table, expression=query).sql(self.name)
442+
insert_stmt = sge.Insert(this=table_expr, expression=query).sql(
443+
self.name
444+
)
441445
cur.execute(insert_stmt)
442446

443447
if overwrite:
444448
cur.execute(
445449
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
446450
)
447451
cur.execute(
448-
f"ALTER TABLE IF EXISTS {table.sql(self.name)} RENAME TO {this.sql(self.name)}"
452+
f"ALTER TABLE IF EXISTS {table_expr.sql(self.name)} RENAME TO {this.sql(self.name)}"
449453
)
450454

451455
if schema is None:
@@ -538,3 +542,10 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
538542
raise
539543
df = MySQLPandasData.convert_table(df, schema)
540544
return df
545+
546+
def _finalize_memtable(self, name: str) -> None:
547+
"""No-op.
548+
549+
Executing **any** SQL in a finalizer causes the underlying connection
550+
socket to be set to `None`. It is unclear why this happens.
551+
"""

ibis/backends/oracle/__init__.py

+10-13
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from __future__ import annotations
44

5-
import atexit
65
import contextlib
76
import re
87
import warnings
@@ -419,11 +418,9 @@ def create_table(
419418
if temp:
420419
properties.append(sge.TemporaryProperty())
421420

422-
temp_memtable_view = None
423421
if obj is not None:
424422
if not isinstance(obj, ir.Expr):
425423
table = ibis.memtable(obj)
426-
temp_memtable_view = table.op().name
427424
else:
428425
table = obj
429426

@@ -468,10 +465,6 @@ def create_table(
468465
)
469466

470467
if schema is None:
471-
# Clean up temporary memtable if we've created one
472-
# for in-memory reads
473-
if temp_memtable_view is not None:
474-
self.drop_table(temp_memtable_view)
475468
return self.table(name, database=database)
476469

477470
# preserve the input schema if it was provided
@@ -527,8 +520,6 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
527520
insert_stmt, list(data.iloc[start:end].itertuples(index=False))
528521
)
529522

530-
atexit.register(self._clean_up_tmp_table, name)
531-
532523
def _get_schema_using_query(self, query: str) -> sch.Schema:
533524
name = util.gen_name("oracle_metadata")
534525
dialect = self.name
@@ -608,6 +599,13 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
608599
return OraclePandasData.convert_table(df, schema)
609600

610601
def _clean_up_tmp_table(self, name: str) -> None:
602+
dialect = self.dialect
603+
604+
ident = sg.to_identifier(name, quoted=self.compiler.quoted)
605+
606+
truncate = sge.TruncateTable(expressions=[ident]).sql(dialect)
607+
drop = sge.Drop(kind="TABLE", this=ident).sql(dialect)
608+
611609
with self.begin() as bind:
612610
# global temporary tables cannot be dropped without first truncating them
613611
#
@@ -616,9 +614,8 @@ def _clean_up_tmp_table(self, name: str) -> None:
616614
# ignore DatabaseError exceptions because the table may not exist
617615
# because it's already been deleted
618616
with contextlib.suppress(oracledb.DatabaseError):
619-
bind.execute(f'TRUNCATE TABLE "{name}"')
617+
bind.execute(truncate)
620618
with contextlib.suppress(oracledb.DatabaseError):
621-
bind.execute(f'DROP TABLE "{name}"')
619+
bind.execute(drop)
622620

623-
def _drop_cached_table(self, name):
624-
self._clean_up_tmp_table(name)
621+
_finalize_memtable = _drop_cached_table = _clean_up_tmp_table

ibis/backends/pandas/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,9 @@ def execute(self, query, params=None, limit="default", **kwargs):
331331
def _create_cached_table(self, name, expr):
332332
return self.create_table(name, expr.execute())
333333

334+
def _finalize_memtable(self, name: str) -> None:
335+
"""No-op, let Python handle clean up."""
336+
334337

335338
@lazy_singledispatch
336339
def _convert_object(obj: Any, _conn):

ibis/backends/polars/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ def _in_memory_table_exists(self, name: str) -> bool:
8181
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
8282
self._add_table(op.name, op.data.to_polars(op.schema).lazy())
8383

84+
def _finalize_memtable(self, name: str) -> None:
85+
self.drop_table(name, force=True)
86+
8487
@deprecated(
8588
as_of="9.1",
8689
instead="use the explicit `read_*` method for the filetype you are trying to read, e.g., read_parquet, read_csv, etc.",

0 commit comments

Comments
 (0)