Skip to content

Commit b5d3236

Browse files
authored
Handle logging non-JSON-serializable classes in stream slices (#22118)
* Add failing test * handle unserializable classes in stream slices * format
1 parent 7f28751 commit b5d3236

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ def _read_incremental(
243243
has_slices = True
244244
if logger.isEnabledFor(logging.DEBUG):
245245
yield AirbyteMessage(
246-
type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}")
246+
type=MessageType.LOG,
247+
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
247248
)
248249
records = stream_instance.read_records(
249250
sync_mode=SyncMode.incremental,
@@ -295,7 +296,8 @@ def _read_full_refresh(
295296
for _slice in slices:
296297
if logger.isEnabledFor(logging.DEBUG):
297298
yield AirbyteMessage(
298-
type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}")
299+
type=MessageType.LOG,
300+
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
299301
)
300302
record_data_or_messages = stream_instance.read_records(
301303
stream_slice=_slice,

airbyte-cdk/python/unit_tests/sources/test_abstract_source.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
import copy
6+
import datetime
67
import logging
78
from collections import defaultdict
89
from typing import Any, Callable, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
@@ -331,11 +332,17 @@ def test_valid_full_refresh_read_with_slices(mocker):
331332
assert expected == messages
332333

333334

334-
def test_read_full_refresh_with_slices_sends_slice_messages(mocker):
335+
@pytest.mark.parametrize(
336+
"slices",
337+
[
338+
[{"1": "1"}, {"2": "2"}],
339+
[{"date": datetime.date(year=2023, month=1, day=1)}, {"date": datetime.date(year=2023, month=1, day=1)}]
340+
]
341+
)
342+
def test_read_full_refresh_with_slices_sends_slice_messages(mocker, slices):
335343
"""Given the logger is debug and a full refresh, AirbyteMessages are sent for slices"""
336344
debug_logger = logging.getLogger("airbyte.debug")
337345
debug_logger.setLevel(logging.DEBUG)
338-
slices = [{"1": "1"}, {"2": "2"}]
339346
stream = MockStream(
340347
[({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
341348
name="s1",

0 commit comments

Comments
 (0)