34
34
_gen_table_names = (f"registered_table{ i :d} " for i in itertools .count ())
35
35
36
36
37
- def _name_from_path (path : Path ) -> str :
38
- base , * _ = path . name . partition ( os . extsep )
39
- return base .replace ("- " , "_" )
37
+ def _name_from_path (path : str ) -> str :
38
+ # https://github.com/duckdb/duckdb/issues/5203
39
+ return path .replace (". " , "_" )
40
40
41
41
42
42
def _name_from_dataset (dataset : pa .dataset .FileSystemDataset ) -> str :
@@ -47,49 +47,67 @@ def _quote(name: str):
47
47
return _dialect .identifier_preparer .quote (name )
48
48
49
49
50
- @_generate_view_code .register (r"parquet://(?P<path>.+)" , priority = 10 )
51
- def _parquet (_ , path , table_name = None , ** kwargs ):
52
- path = Path (path ).absolute ()
53
- table_name = table_name or _name_from_path (path )
54
- quoted_table_name = _quote (table_name )
55
- args = [f"'{ str (path )} '" ]
56
- if kwargs :
57
- args .extend ([f"{ k } ={ v } " for k , v in kwargs .items ()])
50
+ def _get_scheme (scheme ):
51
+ if scheme is None or scheme == "file://" :
52
+ return ""
53
+ return scheme
54
+
55
+
56
+ def _format_kwargs (kwargs ):
58
57
return (
59
- f"CREATE OR REPLACE VIEW { quoted_table_name } as SELECT * "
60
- f"from read_parquet({ ', ' .join (args )} )" ,
61
- table_name ,
58
+ f"{ k } ='{ v } '" if isinstance (v , str ) else f"{ k } ={ v !r} " for k , v in kwargs .items ()
62
59
)
63
60
64
61
65
- @_generate_view_code .register (r"csv(?:\.gz)?://(?P<path>.+)" , priority = 10 )
66
- def _csv (_ , path , table_name = None , ** kwargs ):
67
- path = Path (path ).absolute ()
68
- table_name = table_name or _name_from_path (path )
62
+ @_generate_view_code .register (r"parquet://(?P<path>.+)" , priority = 13 )
63
+ def _parquet (_ , path , table_name = None , scheme = None , ** kwargs ):
64
+ scheme = _get_scheme (scheme )
65
+ if not scheme :
66
+ path = os .path .abspath (path )
67
+ if not table_name :
68
+ table_name = _name_from_path (path )
69
69
quoted_table_name = _quote (table_name )
70
- # AUTO_DETECT and COLUMNS collide, so we set AUTO_DETECT=True
70
+ args = [f"'{ scheme } { path } '" , * _format_kwargs (kwargs )]
71
+ code = f"""\
72
+ CREATE OR REPLACE VIEW { quoted_table_name } AS
73
+ SELECT * FROM read_parquet({ ', ' .join (args )} )"""
74
+ return code , table_name , ["parquet" ] + ["httpfs" ] if scheme else []
75
+
76
+
77
+ @_generate_view_code .register (r"(c|t)sv://(?P<path>.+)" , priority = 13 )
78
+ def _csv (_ , path , table_name = None , scheme = None , ** kwargs ):
79
+ scheme = _get_scheme (scheme )
80
+ if not scheme :
81
+ path = os .path .abspath (path )
82
+ if not table_name :
83
+ table_name = _name_from_path (path )
84
+ quoted_table_name = _quote (table_name )
85
+ # auto_detect and columns collide, so we set auto_detect=True
71
86
# unless COLUMNS has been specified
72
- args = [f"'{ str (path )} '" ]
73
- args .extend (
74
- [
75
- f"AUTO_DETECT="
76
- f"{ kwargs .pop ('AUTO_DETECT' , False if 'COLUMNS' in kwargs else True )} "
77
- ]
78
- )
79
- if kwargs :
80
- args .extend ([f"{ k } ={ v } " for k , v in kwargs .items ()])
81
- return (
82
- f"CREATE OR REPLACE VIEW { quoted_table_name } as SELECT * "
83
- f"from read_csv({ ', ' .join (args )} )" ,
84
- table_name ,
85
- )
87
+ args = [
88
+ f"'{ scheme } { path } '" ,
89
+ f"auto_detect={ kwargs .pop ('auto_detect' , 'columns' not in kwargs )} " ,
90
+ * _format_kwargs (kwargs ),
91
+ ]
92
+ code = f"""\
93
+ CREATE OR REPLACE VIEW { quoted_table_name } AS
94
+ SELECT * FROM read_csv({ ', ' .join (args )} )"""
95
+ return code , table_name , ["httpfs" ] if scheme else []
96
+
97
+
98
+ @_generate_view_code .register (
99
+ r"(?P<scheme>(?:file|https?)://)?(?P<path>.+?\.((?:c|t)sv|txt)(?:\.gz)?)" ,
100
+ priority = 12 ,
101
+ )
102
+ def _csv_file_or_url (_ , path , table_name = None , ** kwargs ):
103
+ return _csv (f"csv://{ path } " , path = path , table_name = table_name , ** kwargs )
86
104
87
105
88
- @_generate_view_code .register (r"(?:file://)?(?P<path>.+)" , priority = 9 )
89
- def _file ( _ , path , table_name = None , ** kwargs ):
90
- num_sep_chars = len ( os . extsep )
91
- extension = "" . join ( Path ( path ). suffixes )[ num_sep_chars :]
92
- return _generate_view_code (f"{ extension } ://{ path } " , table_name = table_name , ** kwargs )
106
+ @_generate_view_code .register (
107
+ r"(?P<scheme>(?:file|https?)://)?(?P<path>.+?\.parquet)" , priority = 12
108
+ )
109
+ def _parquet_file_or_url ( _ , path , table_name = None , ** kwargs ):
110
+ return _parquet (f"parquet ://{ path } " , path = path , table_name = table_name , ** kwargs )
93
111
94
112
95
113
@_generate_view_code .register (r"s3://.+" , priority = 10 )
@@ -100,17 +118,16 @@ def _s3(full_path, table_name=None):
100
118
dataset = ds .dataset (full_path )
101
119
table_name = table_name or _name_from_dataset (dataset )
102
120
quoted_table_name = _quote (table_name )
103
- return quoted_table_name , dataset
121
+ return quoted_table_name , dataset , ()
104
122
105
123
106
124
@_generate_view_code .register (r".+" , priority = 1 )
107
- def _default (_ , ** kwargs ):
125
+ def _default (path , ** kwargs ):
108
126
raise ValueError (
109
- """
110
- Unrecognized filetype or extension.
111
- Valid prefixes are parquet://, csv://, s3://, or file://
127
+ f"""Unrecognized file type or extension: { path } .
112
128
113
- Supported filetypes are parquet, csv, and csv.gz
129
+ Valid prefixes are parquet://, csv://, tsv://, s3://, or file://
130
+ Supported file extensions are parquet, csv, tsv, txt, csv.gz, tsv.gz, and txt.gz
114
131
"""
115
132
)
116
133
@@ -180,6 +197,15 @@ def do_connect(
180
197
)
181
198
)
182
199
self ._meta = sa .MetaData (bind = self .con )
200
+ self ._extensions = set ()
201
+
202
+ def _load_extensions (self , extensions ):
203
+ for extension in extensions :
204
+ if extension not in self ._extensions :
205
+ with self .con .connect () as con :
206
+ con .execute (f"INSTALL '{ extension } '" )
207
+ con .execute (f"LOAD '{ extension } '" )
208
+ self ._extensions .add (extension )
183
209
184
210
def register (
185
211
self ,
@@ -210,7 +236,10 @@ def register(
210
236
The just-registered table
211
237
"""
212
238
if isinstance (source , str ) and source .startswith ("s3://" ):
213
- table_name , dataset = _generate_view_code (source , table_name = table_name )
239
+ table_name , dataset , extensions_required = _generate_view_code (
240
+ source , table_name = table_name
241
+ )
242
+ self ._load_extensions (extensions_required )
214
243
# We don't create a view since DuckDB special cases Arrow Datasets
215
244
# so if we also create a view we end up with both a "lazy table"
216
245
# and a view with the same name
@@ -221,9 +250,10 @@ def register(
221
250
# explicitly.
222
251
cursor .cursor .c .register (table_name , dataset )
223
252
elif isinstance (source , (str , Path )):
224
- sql , table_name = _generate_view_code (
253
+ sql , table_name , extensions_required = _generate_view_code (
225
254
str (source ), table_name = table_name , ** kwargs
226
255
)
256
+ self ._load_extensions (extensions_required )
227
257
self .con .execute (sql )
228
258
else :
229
259
if table_name is None :
0 commit comments