@@ -308,7 +308,7 @@ def to_pyarrow_batches(
308
308
params : Mapping [ir .Scalar , Any ] | None = None ,
309
309
external_tables : Mapping [str , Any ] | None = None ,
310
310
chunk_size : int = 1_000_000 ,
311
- ** _ : Any ,
311
+ ** kwargs : Any ,
312
312
) -> pa .ipc .RecordBatchReader :
313
313
"""Execute expression and return an iterator of pyarrow record batches.
314
314
@@ -328,6 +328,8 @@ def to_pyarrow_batches(
328
328
External data
329
329
chunk_size
330
330
Maximum number of row to return in a single chunk
331
+ kwargs
332
+ Extra arguments passed directly to clickhouse-connect
331
333
332
334
Returns
333
335
-------
@@ -357,14 +359,17 @@ def to_pyarrow_batches(
357
359
external_tables = self ._collect_in_memory_tables (expr , external_tables )
358
360
external_data = self ._normalize_external_tables (external_tables )
359
361
360
- def batcher (sql : str , * , schema : pa .Schema ) -> Iterator [pa .RecordBatch ]:
361
- settings = {}
362
+ settings = kwargs .pop ("settings" , {})
362
363
363
- # readonly != 1 means that the server setting is writable
364
- if self .con .server_settings ["max_block_size" ].readonly != 1 :
365
- settings ["max_block_size" ] = chunk_size
364
+ # readonly != 1 means that the server setting is writable
365
+ if self .con .server_settings ["max_block_size" ].readonly != 1 :
366
+ settings ["max_block_size" ] = chunk_size
367
+
368
+ def batcher (
369
+ sql : str , * , schema : pa .Schema , settings , ** kwargs
370
+ ) -> Iterator [pa .RecordBatch ]:
366
371
with self .con .query_column_block_stream (
367
- sql , external_data = external_data , settings = settings
372
+ sql , external_data = external_data , settings = settings , ** kwargs
368
373
) as blocks :
369
374
yield from map (
370
375
partial (pa .RecordBatch .from_arrays , schema = schema ), blocks
@@ -373,29 +378,34 @@ def batcher(sql: str, *, schema: pa.Schema) -> Iterator[pa.RecordBatch]:
373
378
self ._log (sql )
374
379
schema = table .schema ().to_pyarrow ()
375
380
return pa .ipc .RecordBatchReader .from_batches (
376
- schema , batcher (sql , schema = schema )
381
+ schema , batcher (sql , schema = schema , settings = settings , ** kwargs )
377
382
)
378
383
379
384
def execute (
380
385
self ,
381
386
expr : ir .Expr ,
382
387
limit : str | None = "default" ,
388
+ params : Mapping [ir .Scalar , Any ] | None = None ,
383
389
external_tables : Mapping [str , pd .DataFrame ] | None = None ,
384
390
** kwargs : Any ,
385
391
) -> Any :
386
392
"""Execute an expression."""
387
393
import pandas as pd
388
394
389
395
table = expr .as_table ()
390
- sql = self .compile (table , limit = limit , ** kwargs )
396
+ sql = self .compile (table , params = params , limit = limit )
391
397
392
398
schema = table .schema ()
393
399
self ._log (sql )
394
400
395
401
external_tables = self ._collect_in_memory_tables (expr , external_tables )
396
402
external_data = self ._normalize_external_tables (external_tables )
397
403
df = self .con .query_df (
398
- sql , external_data = external_data , use_na_values = False , use_none = True
404
+ sql ,
405
+ external_data = external_data ,
406
+ use_na_values = False ,
407
+ use_none = True ,
408
+ ** kwargs ,
399
409
)
400
410
401
411
if df .empty :
0 commit comments