Skip to content

Commit f5f798e

Browse files
committed
fix(postgres): create server-side cursors in the iterator to ensure cursor is not cleaned up before it is used
1 parent 49673e5 commit f5f798e

File tree

1 file changed

+27
-35
lines changed

1 file changed

+27
-35
lines changed

ibis/backends/postgres/__init__.py

+27-35
Original file line numberDiff line numberDiff line change
@@ -757,41 +757,33 @@ def to_pyarrow_batches(
757757
import pandas as pd
758758
import pyarrow as pa
759759

760-
self._run_pre_execute_hooks(expr)
760+
def _batches(*, schema: pa.Schema, con: psycopg.Connection, query: str):
761+
columns = schema.names
762+
# server-side cursors need to be uniquely named
763+
with con.cursor(name=util.gen_name("postgres_cursor")) as cursor:
764+
self._register_hstore(cursor)
765+
766+
try:
767+
cur = cursor.execute(query)
768+
except Exception:
769+
con.rollback()
770+
raise
771+
else:
772+
try:
773+
while batch := cur.fetchmany(chunk_size):
774+
yield pa.RecordBatch.from_pandas(
775+
pd.DataFrame(batch, columns=columns), schema=schema
776+
)
777+
except Exception:
778+
con.rollback()
779+
raise
780+
else:
781+
con.commit()
761782

762-
schema = expr.as_table().schema()
783+
self._run_pre_execute_hooks(expr)
763784

785+
schema = expr.as_table().schema().to_pyarrow()
764786
query = self.compile(expr, limit=limit, params=params)
765-
766-
con = self.con
767-
# server-side cursors need to be uniquely named
768-
cursor = con.cursor(name=util.gen_name("postgres_cursor"))
769-
770-
self._register_hstore(cursor)
771-
772-
try:
773-
cursor.execute(query)
774-
except Exception:
775-
con.rollback()
776-
cursor.close()
777-
raise
778-
779-
# cursor must be passed into the function, otherwise it's liable to be
780-
# cleaned up by GC when the function exists, due to the fact that `_batches`
781-
# is a generator
782-
def _batches(schema: pa.Schema, cursor: psycopg.ServerCursor):
783-
columns = schema.names
784-
try:
785-
while batch := cursor.fetchmany(chunk_size):
786-
yield pa.RecordBatch.from_pandas(
787-
pd.DataFrame(batch, columns=columns), schema=schema
788-
)
789-
except Exception:
790-
con.rollback()
791-
cursor.close()
792-
raise
793-
else:
794-
con.commit()
795-
796-
pa_schema = schema.to_pyarrow()
797-
return pa.RecordBatchReader.from_batches(pa_schema, _batches(pa_schema, cursor))
787+
return pa.RecordBatchReader.from_batches(
788+
schema, _batches(schema=schema, con=self.con, query=query)
789+
)

0 commit comments

Comments
 (0)