File tree Expand file tree Collapse file tree 2 files changed +23
-3
lines changed Expand file tree Collapse file tree 2 files changed +23
-3
lines changed Original file line number Diff line number Diff line change 5
5
from __future__ import annotations
6
6
7
7
from pathlib import Path
8
+ from textwrap import dedent
8
9
from typing import cast
9
10
10
11
from overrides import overrides
@@ -158,6 +159,25 @@ def _write_files_to_new_table(
158
159
) -> str :
159
160
"""Write a file(s) to a new table.
160
161
161
- TODO: Optimize this for DuckDB instead of calling the base implementation.
162
+ We use DuckDB's `read_parquet` function to efficiently read the files and insert
163
+ them into the table in a single operation.
164
+
165
+ Note: This implementation is fragile in regards to column ordering. However, since
166
+ we are inserting into a temp table we have just created, there should be no
167
+ drift between the table schema and the file schema.
162
168
"""
163
- return super ()._write_files_to_new_table (files , stream_name , batch_id )
169
+ temp_table_name = self ._create_table_for_loading (
170
+ stream_name = stream_name ,
171
+ batch_id = batch_id ,
172
+ )
173
+ files_list = ", " .join ([f"'{ f !s} '" for f in files ])
174
+ insert_statement = dedent (
175
+ f"""
176
+ INSERT INTO { self .config .schema_name } .{ temp_table_name }
177
+ SELECT * FROM read_parquet(
178
+ [{ files_list } ]
179
+ )
180
+ """
181
+ )
182
+ self ._execute_sql (insert_statement )
183
+ return temp_table_name
Original file line number Diff line number Diff line change 12
12
import airbyte_lib as ab
13
13
14
14
15
- SCALE = 1_000_000 # Number of records to generate between users and purchases.
15
+ SCALE = 5_000_000 # Number of records to generate between users and purchases.
16
16
17
17
18
18
source = ab .get_connector (
You can’t perform that action at this time.
0 commit comments