Skip to content

Commit cb17b8b

Browse files
authored
perf(postgres): improve to_pyarrow_batches by using server-side cursors (#10954)
This adds a specific `to_pyarrow_batches` implementation to the PostgreSQL backend, which uses server side cursors. This allows ibis to allocate memory needed only for `chunk_size` results of the query instead of the whole set. Resolves #10938
1 parent 21aea97 commit cb17b8b

File tree

4 files changed

+97
-16
lines changed

4 files changed

+97
-16
lines changed

ibis/backends/conftest.py

-15
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import importlib.metadata
66
import itertools
77
from functools import cache
8-
from pathlib import Path
98
from typing import TYPE_CHECKING, Any
109

1110
import _pytest
@@ -148,20 +147,6 @@
148147
ALL_BACKENDS = set(_get_backend_names())
149148

150149

151-
@pytest.fixture(scope="session")
152-
def data_dir() -> Path:
153-
"""Return the test data directory.
154-
155-
Returns
156-
-------
157-
Path
158-
Test data directory
159-
"""
160-
root = Path(__file__).absolute().parents[2]
161-
162-
return root / "ci" / "ibis-testing-data"
163-
164-
165150
def _get_backend_conf(backend_str: str):
166151
"""Convert a backend string to the test class for the backend."""
167152
conftest = importlib.import_module(f"ibis.backends.{backend_str}.tests.conftest")

ibis/backends/postgres/__init__.py

+51-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from ibis.backends.sql.compilers.base import TRUE, C, ColGen
2929

3030
if TYPE_CHECKING:
31-
from collections.abc import Callable
31+
from collections.abc import Callable, Mapping
3232
from urllib.parse import ParseResult
3333

3434
import pandas as pd
@@ -740,3 +740,53 @@ def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any:
740740
else:
741741
con.commit()
742742
return cursor
743+
744+
@util.experimental
745+
def to_pyarrow_batches(
746+
self,
747+
expr: ir.Expr,
748+
/,
749+
*,
750+
params: Mapping[ir.Scalar, Any] | None = None,
751+
limit: int | str | None = None,
752+
chunk_size: int = 1_000_000,
753+
**_: Any,
754+
) -> pa.ipc.RecordBatchReader:
755+
import pandas as pd
756+
import pyarrow as pa
757+
758+
self._run_pre_execute_hooks(expr)
759+
760+
schema = expr.as_table().schema()
761+
762+
query = self.compile(expr, limit=limit, params=params)
763+
with contextlib.suppress(AttributeError):
764+
query = query.sql(dialect=self.dialect)
765+
766+
con = self.con
767+
# server-side cursors need to be uniquely named
768+
cursor = con.cursor(name=util.gen_name("postgres_cursor"))
769+
770+
try:
771+
cursor.execute(query)
772+
except Exception:
773+
con.rollback()
774+
cursor.close()
775+
raise
776+
777+
def _batches(schema: pa.Schema):
778+
columns = schema.names
779+
try:
780+
while batch := cursor.fetchmany(chunk_size):
781+
yield pa.RecordBatch.from_pandas(
782+
pd.DataFrame(batch, columns=columns), schema=schema
783+
)
784+
except Exception:
785+
con.rollback()
786+
cursor.close()
787+
raise
788+
else:
789+
con.commit()
790+
791+
pa_schema = schema.to_pyarrow()
792+
return pa.RecordBatchReader.from_batches(pa_schema, _batches(pa_schema))

ibis/conftest.py

+15
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import builtins
44
import os
55
import platform
6+
from pathlib import Path
67

78
import pytest
89

@@ -44,3 +45,17 @@ def add_ibis(monkeypatch, doctest_namespace):
4445
condition=WINDOWS,
4546
reason="windows prevents two connections to the same file even in the same process",
4647
)
48+
49+
50+
@pytest.fixture(scope="session")
51+
def data_dir() -> Path:
52+
"""Return the test data directory.
53+
54+
Returns
55+
-------
56+
Path
57+
Test data directory
58+
"""
59+
root = Path(__file__).absolute().parents[1]
60+
61+
return root / "ci" / "ibis-testing-data"

ibis/tests/benchmarks/test_benchmarks.py

+31
Original file line numberDiff line numberDiff line change
@@ -1019,3 +1019,34 @@ def test_dedup_schema(benchmark):
10191019
itertools.cycle(("int", "string", "array<int>", "float")),
10201020
),
10211021
)
1022+
1023+
1024+
@pytest.fixture(scope="session")
1025+
def pgtable(data_dir):
1026+
pd = pytest.importorskip("pandas")
1027+
pytest.importorskip("psycopg")
1028+
1029+
from ibis.backends.postgres.tests.conftest import (
1030+
IBIS_TEST_POSTGRES_DB,
1031+
PG_HOST,
1032+
PG_PASS,
1033+
PG_PORT,
1034+
PG_USER,
1035+
)
1036+
1037+
con = ibis.postgres.connect(
1038+
user=PG_USER,
1039+
password=PG_PASS,
1040+
host=PG_HOST,
1041+
port=PG_PORT,
1042+
database=IBIS_TEST_POSTGRES_DB,
1043+
)
1044+
name = ibis.util.gen_name("functional_alltypes_bench")
1045+
yield con.create_table(
1046+
name, obj=pd.read_csv(data_dir / "csv" / "functional_alltypes.csv"), temp=True
1047+
)
1048+
con.disconnect()
1049+
1050+
1051+
def test_postgres_record_batches(pgtable, benchmark):
1052+
benchmark(pgtable.to_pyarrow)

0 commit comments

Comments
 (0)