Skip to content

Commit d4944a3

Browse files
authored
✨ [destination-DuckDB] Improve performance, use pyarrow batch insert as replacement of executemany (#36715)
1 parent dfc933a commit d4944a3

File tree

6 files changed

+277
-42
lines changed

6 files changed

+277
-42
lines changed

airbyte-integrations/connectors/destination-duckdb/destination_duckdb/destination.py

+32-26
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99
import uuid
1010
from collections import defaultdict
1111
from logging import getLogger
12-
from typing import Any, Iterable, Mapping
12+
from typing import Any, Dict, Iterable, List, Mapping
1313

1414
import duckdb
15+
import pyarrow as pa
1516
from airbyte_cdk import AirbyteLogger
1617
from airbyte_cdk.destinations import Destination
1718
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
@@ -109,53 +110,58 @@ def write(
109110

110111
con.execute(query)
111112

112-
buffer = defaultdict(list)
113+
buffer = defaultdict(lambda: defaultdict(list))
113114

114115
for message in input_messages:
115116
if message.type == Type.STATE:
116117
# flush the buffer
117118
for stream_name in buffer.keys():
118119
logger.info(f"flushing buffer for state: {message}")
119-
table_name = f"_airbyte_raw_{stream_name}"
120-
query = f"""
121-
INSERT INTO {schema_name}.{table_name}
122-
(_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
123-
VALUES (?,?,?)
124-
"""
125-
con.executemany(query, buffer[stream_name])
120+
DestinationDuckdb._safe_write(con=con, buffer=buffer, schema_name=schema_name, stream_name=stream_name)
126121

127-
con.commit()
128-
buffer = defaultdict(list)
122+
buffer = defaultdict(lambda: defaultdict(list))
129123

130124
yield message
131125
elif message.type == Type.RECORD:
132126
data = message.record.data
133-
stream = message.record.stream
134-
if stream not in streams:
135-
logger.debug(f"Stream {stream} was not present in configured streams, skipping")
127+
stream_name = message.record.stream
128+
if stream_name not in streams:
129+
logger.debug(f"Stream {stream_name} was not present in configured streams, skipping")
136130
continue
137-
138131
# add to buffer
139-
buffer[stream].append(
140-
(
141-
str(uuid.uuid4()),
142-
datetime.datetime.now().isoformat(),
143-
json.dumps(data),
144-
)
145-
)
132+
buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4()))
133+
buffer[stream_name]["_airbyte_emitted_at"].append(datetime.datetime.now().isoformat())
134+
buffer[stream_name]["_airbyte_data"].append(json.dumps(data))
135+
146136
else:
147137
logger.info(f"Message type {message.type} not supported, skipping")
148138

149139
# flush any remaining messages
150140
for stream_name in buffer.keys():
151-
table_name = f"_airbyte_raw_{stream_name}"
141+
DestinationDuckdb._safe_write(con=con, buffer=buffer, schema_name=schema_name, stream_name=stream_name)
142+
143+
@staticmethod
144+
def _safe_write(*, con: duckdb.DuckDBPyConnection, buffer: Dict[str, Dict[str, List[Any]]], schema_name: str, stream_name: str):
145+
table_name = f"_airbyte_raw_{stream_name}"
146+
try:
147+
pa_table = pa.Table.from_pydict(buffer[stream_name])
148+
except:
149+
logger.exception(
150+
f"Writing with pyarrow view failed, falling back to writing with executemany. Expect some performance degradation."
151+
)
152152
query = f"""
153153
INSERT INTO {schema_name}.{table_name}
154+
(_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
154155
VALUES (?,?,?)
155156
"""
156-
157-
con.executemany(query, buffer[stream_name])
158-
con.commit()
157+
entries_to_write = buffer[stream_name]
158+
con.executemany(
159+
query, zip(entries_to_write["_airbyte_ab_id"], entries_to_write["_airbyte_emitted_at"], entries_to_write["_airbyte_data"])
160+
)
161+
else:
162+
# DuckDB will automatically find and SELECT from the `pa_table`
163+
# local variable defined above.
164+
con.sql(f"INSERT INTO {schema_name}.{table_name} SELECT * FROM pa_table")
159165

160166
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
161167
"""

airbyte-integrations/connectors/destination-duckdb/integration_tests/integration_test.py

+118-3
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99
import random
1010
import string
1111
import tempfile
12+
import time
1213
from datetime import datetime
1314
from pathlib import Path
14-
from typing import Any, Dict
15+
from typing import Any, Callable, Dict, Generator, Iterable
1516
from unittest.mock import MagicMock
1617

1718
import duckdb
@@ -30,6 +31,7 @@
3031
)
3132
from destination_duckdb import DestinationDuckdb
3233
from destination_duckdb.destination import CONFIG_MOTHERDUCK_API_KEY
34+
from faker import Faker
3335

3436
CONFIG_PATH = "integration_tests/config.json"
3537
SECRETS_CONFIG_PATH = (
@@ -96,6 +98,12 @@ def test_table_name() -> str:
9698
return f"airbyte_integration_{rand_string}"
9799

98100

101+
@pytest.fixture
102+
def test_large_table_name() -> str:
103+
letters = string.ascii_lowercase
104+
rand_string = "".join(random.choice(letters) for _ in range(10))
105+
return f"airbyte_integration_{rand_string}"
106+
99107
@pytest.fixture
100108
def table_schema() -> str:
101109
schema = {"type": "object", "properties": {"column1": {"type": ["null", "string"]}}}
@@ -104,7 +112,7 @@ def table_schema() -> str:
104112

105113
@pytest.fixture
106114
def configured_catalogue(
107-
test_table_name: str, table_schema: str
115+
test_table_name: str, test_large_table_name: str, table_schema: str,
108116
) -> ConfiguredAirbyteCatalog:
109117
append_stream = ConfiguredAirbyteStream(
110118
stream=AirbyteStream(
@@ -115,7 +123,16 @@ def configured_catalogue(
115123
sync_mode=SyncMode.incremental,
116124
destination_sync_mode=DestinationSyncMode.append,
117125
)
118-
return ConfiguredAirbyteCatalog(streams=[append_stream])
126+
append_stream_large = ConfiguredAirbyteStream(
127+
stream=AirbyteStream(
128+
name=test_large_table_name,
129+
json_schema=table_schema,
130+
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
131+
),
132+
sync_mode=SyncMode.incremental,
133+
destination_sync_mode=DestinationSyncMode.append,
134+
)
135+
return ConfiguredAirbyteCatalog(streams=[append_stream, append_stream_large])
119136

120137

121138
@pytest.fixture
@@ -206,3 +223,101 @@ def test_write(
206223
assert len(result) == 2
207224
assert result[0][2] == json.dumps(airbyte_message1.record.data)
208225
assert result[1][2] == json.dumps(airbyte_message2.record.data)
226+
227+
def _airbyte_messages(n: int, batch_size: int, table_name: str) -> Generator[AirbyteMessage, None, None]:
228+
fake = Faker()
229+
Faker.seed(0)
230+
231+
for i in range(n):
232+
if i != 0 and i % batch_size == 0:
233+
yield AirbyteMessage(
234+
type=Type.STATE, state=AirbyteStateMessage(data={"state": str(i // batch_size)})
235+
)
236+
else:
237+
message = AirbyteMessage(
238+
type=Type.RECORD,
239+
record=AirbyteRecordMessage(
240+
stream=table_name,
241+
data={"key1": fake.first_name() , "key2": fake.ssn()},
242+
emitted_at=int(datetime.now().timestamp()) * 1000,
243+
),
244+
)
245+
yield message
246+
247+
248+
def _airbyte_messages_with_inconsistent_json_fields(n: int, batch_size: int, table_name: str) -> Generator[AirbyteMessage, None, None]:
249+
fake = Faker()
250+
Faker.seed(0)
251+
random.seed(0)
252+
253+
for i in range(n):
254+
if i != 0 and i % batch_size == 0:
255+
yield AirbyteMessage(
256+
type=Type.STATE, state=AirbyteStateMessage(data={"state": str(i // batch_size)})
257+
)
258+
else:
259+
message = AirbyteMessage(
260+
type=Type.RECORD,
261+
record=AirbyteRecordMessage(
262+
stream=table_name,
263+
# Throw in empty nested objects and see how pyarrow deals with them.
264+
data={"key1": fake.first_name() ,
265+
"key2": fake.ssn() if random.random()< 0.5 else random.randrange(1000,9999999999999),
266+
"nested1": {} if random.random()< 0.1 else {
267+
"key3": fake.first_name() ,
268+
"key4": fake.ssn() if random.random()< 0.5 else random.randrange(1000,9999999999999),
269+
"dictionary1":{} if random.random()< 0.1 else {
270+
"key3": fake.first_name() ,
271+
"key4": "True" if random.random() < 0.5 else True
272+
}
273+
}
274+
}
275+
if random.random() < 0.9 else {},
276+
277+
emitted_at=int(datetime.now().timestamp()) * 1000,
278+
),
279+
)
280+
yield message
281+
282+
283+
TOTAL_RECORDS = 5_000
284+
BATCH_WRITE_SIZE = 1000
285+
286+
@pytest.mark.slow
287+
@pytest.mark.parametrize("airbyte_message_generator,explanation",
288+
[(_airbyte_messages, "Test writing a large number of simple json objects."),
289+
(_airbyte_messages_with_inconsistent_json_fields, "Test writing a large number of json messages with inconsistent schema.")] )
290+
def test_large_number_of_writes(
291+
config: Dict[str, str],
292+
request,
293+
configured_catalogue: ConfiguredAirbyteCatalog,
294+
test_large_table_name: str,
295+
test_schema_name: str,
296+
airbyte_message_generator: Callable[[int, int, str], Iterable[AirbyteMessage]],
297+
explanation: str,
298+
):
299+
destination = DestinationDuckdb()
300+
generator = destination.write(
301+
config,
302+
configured_catalogue,
303+
airbyte_message_generator(TOTAL_RECORDS, BATCH_WRITE_SIZE, test_large_table_name),
304+
)
305+
306+
result = list(generator)
307+
assert len(result) == TOTAL_RECORDS // (BATCH_WRITE_SIZE + 1)
308+
motherduck_api_key = str(config.get(CONFIG_MOTHERDUCK_API_KEY, ""))
309+
duckdb_config = {}
310+
if motherduck_api_key:
311+
duckdb_config["motherduck_token"] = motherduck_api_key
312+
duckdb_config["custom_user_agent"] = "airbyte_intg_test"
313+
314+
con = duckdb.connect(
315+
database=config.get("destination_path"), read_only=False, config=duckdb_config
316+
)
317+
with con:
318+
cursor = con.execute(
319+
"SELECT count(1) "
320+
f"FROM {test_schema_name}._airbyte_raw_{test_large_table_name}"
321+
)
322+
result = cursor.fetchall()
323+
assert result[0][0] == TOTAL_RECORDS - TOTAL_RECORDS // (BATCH_WRITE_SIZE + 1)

airbyte-integrations/connectors/destination-duckdb/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ data:
44
connectorSubtype: database
55
connectorType: destination
66
definitionId: 94bd199c-2ff0-4aa2-b98e-17f0acb72610
7-
dockerImageTag: 0.3.3
7+
dockerImageTag: 0.3.4
88
dockerRepository: airbyte/destination-duckdb
99
githubIssueLabel: destination-duckdb
1010
icon: duckdb.svg

0 commit comments

Comments
 (0)