Skip to content

Commit 48af92a

Browse files
authored
Concurrent CDK: if exception is AirbyteTracedException, raise this an… (#37443)
1 parent ebb57f0 commit 48af92a

File tree

2 files changed

+56
-4
lines changed

2 files changed

+56
-4
lines changed

airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,12 @@ def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMess
148148
"""
149149
self._flag_exception(exception.stream_name, exception.exception)
150150
self._logger.exception(f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception)
151-
yield AirbyteTracedException.from_exception(
152-
exception, stream_descriptor=StreamDescriptor(name=exception.stream_name)
153-
).as_airbyte_message()
151+
152+
stream_descriptor = StreamDescriptor(name=exception.stream_name)
153+
if isinstance(exception.exception, AirbyteTracedException):
154+
yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
155+
else:
156+
yield AirbyteTracedException.from_exception(exception, stream_descriptor=stream_descriptor).as_airbyte_message()
154157

155158
def _flag_exception(self, stream_name: str, exception: Exception) -> None:
156159
self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)

airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

+50-1
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,56 @@ def test_on_exception_return_trace_message_and_on_stream_complete_return_stream_
548548

549549
exception_messages = list(handler.on_exception(exception))
550550
assert len(exception_messages) == 1
551-
assert exception_messages[0].type == MessageType.TRACE
551+
assert "StreamThreadException" in exception_messages[0].trace.error.stack_trace
552+
553+
assert list(handler.on_partition_complete_sentinel(PartitionCompleteSentinel(self._an_open_partition))) == [
554+
AirbyteMessage(
555+
type=MessageType.TRACE,
556+
trace=AirbyteTraceMessage(
557+
type=TraceType.STREAM_STATUS,
558+
emitted_at=1577836800000.0,
559+
stream_status=AirbyteStreamStatusTraceMessage(
560+
stream_descriptor=StreamDescriptor(name=_STREAM_NAME), status=AirbyteStreamStatus(AirbyteStreamStatus.INCOMPLETE)
561+
),
562+
),
563+
)
564+
]
565+
with pytest.raises(AirbyteTracedException):
566+
handler.is_done()
567+
568+
@freezegun.freeze_time("2020-01-01T00:00:00")
569+
def test_given_underlying_exception_is_traced_exception_on_exception_return_trace_message_and_on_stream_complete_return_stream_status(self):
570+
stream_instances_to_read_from = [self._stream, self._another_stream]
571+
572+
handler = ConcurrentReadProcessor(
573+
stream_instances_to_read_from,
574+
self._partition_enqueuer,
575+
self._thread_pool_manager,
576+
self._logger,
577+
self._slice_logger,
578+
self._message_repository,
579+
self._partition_reader,
580+
)
581+
582+
handler.start_next_partition_generator()
583+
handler.on_partition(self._an_open_partition)
584+
list(handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._stream)))
585+
list(handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._another_stream)))
586+
587+
another_stream = Mock(spec=AbstractStream)
588+
another_stream.name = _STREAM_NAME
589+
another_stream.as_airbyte_stream.return_value = AirbyteStream(
590+
name=_ANOTHER_STREAM_NAME,
591+
json_schema={},
592+
supported_sync_modes=[SyncMode.full_refresh],
593+
)
594+
595+
underlying_exception = AirbyteTracedException()
596+
exception = StreamThreadException(underlying_exception, _STREAM_NAME)
597+
598+
exception_messages = list(handler.on_exception(exception))
599+
assert len(exception_messages) == 1
600+
assert "AirbyteTracedException" in exception_messages[0].trace.error.stack_trace
552601

553602
assert list(handler.on_partition_complete_sentinel(PartitionCompleteSentinel(self._an_open_partition))) == [
554603
AirbyteMessage(

0 commit comments

Comments
 (0)