Skip to content

Commit 64be7b1

Browse files
committed
feat(datatypes): add as_struct method to convert schemas to structs
1 parent a7aa3a2 commit 64be7b1

File tree

4 files changed

+27
-47
lines changed

4 files changed

+27
-47
lines changed

ibis/backends/base/__init__.py

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,17 @@
2020
MutableMapping,
2121
)
2222

23-
if TYPE_CHECKING:
24-
import pandas as pd
25-
import pyarrow as pa
26-
27-
import ibis.expr.schema as sch
28-
2923
import ibis
3024
import ibis.common.exceptions as exc
3125
import ibis.config
3226
import ibis.expr.operations as ops
3327
import ibis.expr.types as ir
3428
from ibis import util
3529

30+
if TYPE_CHECKING:
31+
import pandas as pd
32+
import pyarrow as pa
33+
3634
__all__ = ('BaseBackend', 'Database', 'connect')
3735

3836

@@ -223,16 +221,6 @@ def _import_pyarrow():
223221
else:
224222
return pyarrow
225223

226-
@staticmethod
227-
def _table_or_column_schema(expr: ir.Expr) -> sch.Schema:
228-
from ibis.backends.pyarrow.datatypes import sch
229-
230-
if isinstance(expr, ir.Table):
231-
return expr.schema()
232-
else:
233-
# ColumnExpr has no schema method, define single-column schema
234-
return sch.schema([(expr.get_name(), expr.type())])
235-
236224
@util.experimental
237225
def to_pyarrow(
238226
self,
@@ -275,7 +263,7 @@ def to_pyarrow(
275263
except ValueError:
276264
# The pyarrow batches iterator is empty so pass in an empty
277265
# iterator and a pyarrow schema
278-
schema = self._table_or_column_schema(expr)
266+
schema = expr.as_table().schema()
279267
table = pa.Table.from_batches([], schema=schema.to_pyarrow())
280268

281269
if isinstance(expr, ir.Table):

ibis/backends/base/sql/__init__.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def to_pyarrow_batches(
154154
params: Mapping[ir.Scalar, Any] | None = None,
155155
limit: int | str | None = None,
156156
chunk_size: int = 1_000_000,
157-
**kwargs: Any,
157+
**_: Any,
158158
) -> pa.ipc.RecordBatchReader:
159159
"""Execute expression and return an iterator of pyarrow record batches.
160160
@@ -172,31 +172,25 @@ def to_pyarrow_batches(
172172
Mapping of scalar parameter expressions to value.
173173
chunk_size
174174
Maximum number of rows in each returned record batch.
175-
kwargs
176-
Keyword arguments
177175
178176
Returns
179177
-------
180-
results
181-
RecordBatchReader
178+
RecordBatchReader
179+
Collection of pyarrow `RecordBatch`s.
182180
"""
183181
pa = self._import_pyarrow()
184182

185-
from ibis.backends.pyarrow.datatypes import ibis_to_pyarrow_struct
186-
187-
schema = self._table_or_column_schema(expr)
188-
189-
def _batches():
183+
schema = expr.as_table().schema()
184+
array_type = schema.as_struct().to_pyarrow()
185+
arrays = (
186+
pa.array(map(tuple, batch), type=array_type)
190187
for batch in self._cursor_batches(
191188
expr, params=params, limit=limit, chunk_size=chunk_size
192-
):
193-
struct_array = pa.array(
194-
map(tuple, batch),
195-
type=ibis_to_pyarrow_struct(schema),
196-
)
197-
yield pa.RecordBatch.from_struct_array(struct_array)
189+
)
190+
)
191+
batches = map(pa.RecordBatch.from_struct_array, arrays)
198192

199-
return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), _batches())
193+
return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batches)
200194

201195
def execute(
202196
self,

ibis/backends/clickhouse/__init__.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -303,21 +303,16 @@ def to_pyarrow_batches(
303303
"""
304304
pa = self._import_pyarrow()
305305

306-
from ibis.backends.pyarrow.datatypes import ibis_to_pyarrow_struct
307-
308-
schema = self._table_or_column_schema(expr)
309-
310-
def _batches():
306+
schema = expr.as_table().schema()
307+
array_type = schema.as_struct().to_pyarrow()
308+
batches = (
309+
pa.RecordBatch.from_struct_array(pa.array(batch, type=array_type))
311310
for batch in self._cursor_batches(
312311
expr, params=params, limit=limit, chunk_size=chunk_size
313-
):
314-
struct_array = pa.array(
315-
map(tuple, batch),
316-
type=ibis_to_pyarrow_struct(schema),
317-
)
318-
yield pa.RecordBatch.from_struct_array(struct_array)
319-
320-
return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), _batches())
312+
)
313+
)
314+
315+
return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batches)
321316

322317
def _cursor_batches(
323318
self,

ibis/expr/schema.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,9 @@ def to_pyarrow(self):
215215

216216
return ibis_to_pyarrow_schema(self)
217217

218+
def as_struct(self) -> dt.Struct:
219+
return dt.Struct(self.names, self.types)
220+
218221
def __gt__(self, other: Schema) -> bool:
219222
"""Return whether `self` is a strict superset of `other`."""
220223
return set(self.items()) > set(other.items())

0 commit comments

Comments
 (0)