26
26
from ibis .backends .duckdb .compiler import DuckDBSQLCompiler
27
27
from ibis .backends .duckdb .datatypes import parse
28
28
29
- _dialect = sa .dialects .postgresql .dialect ()
30
-
31
29
# counters for in-memory, parquet, and csv reads
32
30
# used if no table name is specified
33
31
pd_n = itertools .count (0 )
@@ -42,10 +40,18 @@ def normalize_filenames(source_list):
42
40
return list (map (util .normalize_filename , source_list ))
43
41
44
42
45
- def _format_kwargs (kwargs ):
46
- return (
47
- f"{ k } ='{ v } '" if isinstance (v , str ) else f"{ k } ={ v !r} " for k , v in kwargs .items ()
48
- )
43
+ def _create_view (* args , ** kwargs ):
44
+ import sqlalchemy_views as sav
45
+
46
+ return sav .CreateView (* args , ** kwargs )
47
+
48
+
49
+ def _format_kwargs (kwargs : Mapping [str , Any ]):
50
+ bindparams , pieces = [], []
51
+ for name , value in kwargs .items ():
52
+ bindparams .append (sa .bindparam (name , value ))
53
+ pieces .append (f"{ name } = :{ name } " )
54
+ return sa .text (", " .join (pieces )).bindparams (* bindparams )
49
55
50
56
51
57
class Backend (BaseAlchemyBackend ):
@@ -189,14 +195,13 @@ def register(
189
195
elif first .startswith (("postgres://" , "postgresql://" )):
190
196
return self .read_postgres (source , table_name = table_name , ** kwargs )
191
197
else :
192
- self ._register_failure ()
193
- return None
198
+ self ._register_failure () # noqa: RET503
194
199
195
200
def _register_failure (self ):
196
201
import inspect
197
202
198
203
msg = ", " .join (
199
- m [ 0 ] for m in inspect .getmembers (self ) if m [ 0 ] .startswith ("read_" )
204
+ name for name , _ in inspect .getmembers (self ) if name .startswith ("read_" )
200
205
)
201
206
raise ValueError (
202
207
f"Cannot infer appropriate read function for input, "
@@ -219,7 +224,7 @@ def read_csv(
219
224
table_name
220
225
An optional name to use for the created table. This defaults to
221
226
a sequentially generated name.
222
- ** kwargs
227
+ kwargs
223
228
Additional keyword arguments passed to DuckDB loading function.
224
229
See https://duckdb.org/docs/data/csv for more information.
225
230
@@ -233,24 +238,19 @@ def read_csv(
233
238
if not table_name :
234
239
table_name = f"ibis_read_csv_{ next (csv_n )} "
235
240
236
- quoted_table_name = self ._quote (table_name )
237
-
238
241
# auto_detect and columns collide, so we set auto_detect=True
239
242
# unless COLUMNS has been specified
240
- args = [
241
- str (source_list ),
242
- f"auto_detect={ kwargs .pop ('auto_detect' , 'columns' not in kwargs )} " ,
243
- * _format_kwargs (kwargs ),
244
- ]
245
- sql = f"""CREATE OR REPLACE VIEW { quoted_table_name } AS
246
- SELECT * FROM read_csv({ ', ' .join (args )} )"""
247
-
248
243
if any (source .startswith (("http://" , "https://" )) for source in source_list ):
249
244
self ._load_extensions (["httpfs" ])
250
245
246
+ kwargs ["auto_detect" ] = kwargs .pop ("auto_detect" , "columns" not in kwargs )
247
+ source = sa .select (sa .literal_column ("*" )).select_from (
248
+ sa .func .read_csv (sa .func .list_value (* source_list ), _format_kwargs (kwargs ))
249
+ )
250
+ view = _create_view (sa .table (table_name ), source , or_replace = True )
251
251
with self .begin () as con :
252
- con .execute (sa . text ( sql ) )
253
- return self ._read (table_name )
252
+ con .execute (view )
253
+ return self .table (table_name )
254
254
255
255
def read_parquet (
256
256
self ,
@@ -268,7 +268,7 @@ def read_parquet(
268
268
table_name
269
269
An optional name to use for the created table. This defaults to
270
270
a sequentially generated name.
271
- ** kwargs
271
+ kwargs
272
272
Additional keyword arguments passed to DuckDB loading function.
273
273
See https://duckdb.org/docs/data/parquet for more information.
274
274
@@ -302,17 +302,20 @@ def read_parquet(
302
302
source .startswith (("http://" , "https://" )) for source in source_list
303
303
):
304
304
self ._load_extensions (["httpfs" ])
305
- dataset = str (source_list )
306
- table_name = table_name or f"ibis_read_parquet_{ next (pa_n )} "
307
305
308
- quoted_table_name = self ._quote (table_name )
309
- sql = f"""CREATE OR REPLACE VIEW { quoted_table_name } AS
310
- SELECT * FROM read_parquet({ dataset } )"""
306
+ if table_name is None :
307
+ table_name = f"ibis_read_parquet_{ next (pa_n )} "
311
308
309
+ source = sa .select (sa .literal_column ("*" )).select_from (
310
+ sa .func .read_parquet (
311
+ sa .func .list_value (* source_list ), _format_kwargs (kwargs )
312
+ )
313
+ )
314
+ view = _create_view (sa .table (table_name ), source , or_replace = True )
312
315
with self .begin () as con :
313
- con .execute (sa . text ( sql ) )
316
+ con .execute (view )
314
317
315
- return self ._read (table_name )
318
+ return self .table (table_name )
316
319
317
320
def read_in_memory (
318
321
self , dataframe : pd .DataFrame | pa .Table , table_name : str | None = None
@@ -336,9 +339,9 @@ def read_in_memory(
336
339
with self .begin () as con :
337
340
con .connection .register (table_name , dataframe )
338
341
339
- return self ._read (table_name )
342
+ return self .table (table_name )
340
343
341
- def read_postgres (self , uri , table_name = None ):
344
+ def read_postgres (self , uri , table_name : str | None = None , schema : str = "public" ):
342
345
"""Register a table from a postgres instance into a DuckDB table.
343
346
344
347
Parameters
@@ -347,6 +350,8 @@ def read_postgres(self, uri, table_name=None):
347
350
The postgres URI in form 'postgres://user:password@host:port'
348
351
table_name
349
352
The table to read
353
+ schema
354
+ PostgreSQL schema where `table_name` resides
350
355
351
356
Returns
352
357
-------
@@ -358,18 +363,13 @@ def read_postgres(self, uri, table_name=None):
358
363
"`table_name` is required when registering a postgres table"
359
364
)
360
365
self ._load_extensions (["postgres_scanner" ])
361
- quoted_table_name = self ._quote (table_name )
362
- sql = (
363
- f"CREATE OR REPLACE VIEW { quoted_table_name } AS "
364
- f"SELECT * FROM postgres_scan_pushdown('{ uri } ', 'public', '{ table_name } ')"
366
+ source = sa .select (sa .literal_column ("*" )).select_from (
367
+ sa .func .postgres_scan_pushdown (uri , schema , table_name )
365
368
)
369
+ view = _create_view (sa .table (table_name ), source , or_replace = True )
366
370
with self .begin () as con :
367
- con .execute (sa .text (sql ))
368
-
369
- return self ._read (table_name )
371
+ con .execute (view )
370
372
371
- def _read (self , table_name ):
372
- _table = self .table (table_name )
373
373
return self .table (table_name )
374
374
375
375
def to_pyarrow_batches (
@@ -379,7 +379,7 @@ def to_pyarrow_batches(
379
379
params : Mapping [ir .Scalar , Any ] | None = None ,
380
380
limit : int | str | None = None ,
381
381
chunk_size : int = 1_000_000 ,
382
- ** kwargs : Any ,
382
+ ** _ : Any ,
383
383
) -> pa .ipc .RecordBatchReader :
384
384
# TODO: duckdb seems to not care about the `chunk_size` argument
385
385
# and returns batches in 1024 row chunks
@@ -401,7 +401,7 @@ def to_pyarrow(
401
401
* ,
402
402
params : Mapping [ir .Scalar , Any ] | None = None ,
403
403
limit : int | str | None = None ,
404
- ** kwargs : Any ,
404
+ ** _ : Any ,
405
405
) -> pa .Table :
406
406
pa = self ._import_pyarrow ()
407
407
query_ast = self .compiler .to_ast_ensure_limit (expr , limit , params = params )
0 commit comments