Skip to content

Commit a900c78

Browse files
authored
Add StreamDescriptor as params of AirbyteTracedException.__init__ (#37108)
1 parent 6d5ecca commit a900c78

File tree

5 files changed

+111
-25
lines changed

5 files changed

+111
-25
lines changed

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,14 @@ def read(
159159
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
160160
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
161161
display_message = stream_instance.get_error_display_message(e)
162+
stream_descriptor = StreamDescriptor(name=configured_stream.stream.name)
162163
if display_message:
163-
traced_exception = AirbyteTracedException.from_exception(e, message=display_message)
164+
traced_exception = AirbyteTracedException.from_exception(
165+
e, message=display_message, stream_descriptor=stream_descriptor
166+
)
164167
else:
165-
traced_exception = AirbyteTracedException.from_exception(e)
166-
yield traced_exception.as_sanitized_airbyte_message(
167-
stream_descriptor=StreamDescriptor(name=configured_stream.stream.name)
168-
)
168+
traced_exception = AirbyteTracedException.from_exception(e, stream_descriptor=stream_descriptor)
169+
yield traced_exception.as_sanitized_airbyte_message()
169170
stream_name_to_exception[stream_instance.name] = traced_exception
170171
if self.stop_sync_on_stream_failure:
171172
logger.info(f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}")

airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

+23-12
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,22 @@ def on_partition_complete_sentinel(self, sentinel: PartitionCompleteSentinel) ->
9898
3. Emit messages that were added to the message repository
9999
"""
100100
partition = sentinel.partition
101-
partition.close()
102-
partitions_running = self._streams_to_running_partitions[partition.stream_name()]
103-
if partition in partitions_running:
104-
partitions_running.remove(partition)
105-
# If all partitions were generated and this was the last one, the stream is done
106-
if partition.stream_name() not in self._streams_currently_generating_partitions and len(partitions_running) == 0:
107-
yield from self._on_stream_is_done(partition.stream_name())
108-
yield from self._message_repository.consume_queue()
101+
102+
try:
103+
partition.close()
104+
except Exception as exception:
105+
self._flag_exception(partition.stream_name(), exception)
106+
yield AirbyteTracedException.from_exception(
107+
exception, stream_descriptor=StreamDescriptor(name=partition.stream_name())
108+
).as_sanitized_airbyte_message()
109+
finally:
110+
partitions_running = self._streams_to_running_partitions[partition.stream_name()]
111+
if partition in partitions_running:
112+
partitions_running.remove(partition)
113+
# If all partitions were generated and this was the last one, the stream is done
114+
if partition.stream_name() not in self._streams_currently_generating_partitions and len(partitions_running) == 0:
115+
yield from self._on_stream_is_done(partition.stream_name())
116+
yield from self._message_repository.consume_queue()
109117

110118
def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
111119
"""
@@ -136,11 +144,14 @@ def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMess
136144
1. Stop all running streams
137145
2. Raise the exception
138146
"""
139-
self._exceptions_per_stream_name.setdefault(exception.stream_name, []).append(exception.exception)
147+
self._flag_exception(exception.stream_name, exception.exception)
140148
self._logger.exception(f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception)
141-
yield AirbyteTracedException.from_exception(exception).as_airbyte_message(
142-
stream_descriptor=StreamDescriptor(name=exception.stream_name)
143-
)
149+
yield AirbyteTracedException.from_exception(
150+
exception, stream_descriptor=StreamDescriptor(name=exception.stream_name)
151+
).as_airbyte_message()
152+
153+
def _flag_exception(self, stream_name: str, exception: Exception) -> None:
154+
self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
144155

145156
def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
146157
"""

airbyte-cdk/python/airbyte_cdk/utils/traced_exception.py

+18-8
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,28 @@ def __init__(
3131
message: Optional[str] = None,
3232
failure_type: FailureType = FailureType.system_error,
3333
exception: Optional[BaseException] = None,
34+
stream_descriptor: Optional[StreamDescriptor] = None,
3435
):
3536
"""
3637
:param internal_message: the internal error that caused the failure
3738
:param message: a user-friendly message that indicates the cause of the error
3839
:param failure_type: the type of error
3940
:param exception: the exception that caused the error, from which the stack trace should be retrieved
41+
:param stream_descriptor: describe the stream from which the exception comes from
4042
"""
4143
self.internal_message = internal_message
4244
self.message = message
4345
self.failure_type = failure_type
4446
self._exception = exception
47+
self._stream_descriptor = stream_descriptor
4548
super().__init__(internal_message)
4649

47-
def as_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> AirbyteMessage:
50+
def as_airbyte_message(self, stream_descriptor: Optional[StreamDescriptor] = None) -> AirbyteMessage:
4851
"""
4952
Builds an AirbyteTraceMessage from the exception
53+
54+
:param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
55+
stream_descriptors are defined, the one from `as_airbyte_message` will be discarded.
5056
"""
5157
now_millis = datetime.now().timestamp() * 1000.0
5258

@@ -61,18 +67,18 @@ def as_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> Airb
6167
internal_message=self.internal_message,
6268
failure_type=self.failure_type,
6369
stack_trace=stack_trace_str,
64-
stream_descriptor=stream_descriptor,
70+
stream_descriptor=self._stream_descriptor if self._stream_descriptor is not None else stream_descriptor,
6571
),
6672
)
6773

6874
return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
6975

70-
def as_connection_status_message(self) -> AirbyteMessage:
76+
def as_connection_status_message(self) -> Optional[AirbyteMessage]:
7177
if self.failure_type == FailureType.config_error:
72-
output_message = AirbyteMessage(
78+
return AirbyteMessage(
7379
type=MessageType.CONNECTION_STATUS, connectionStatus=AirbyteConnectionStatus(status=Status.FAILED, message=self.message)
7480
)
75-
return output_message
81+
return None
7682

7783
def emit_message(self) -> None:
7884
"""
@@ -84,16 +90,20 @@ def emit_message(self) -> None:
8490
print(filtered_message)
8591

8692
@classmethod
87-
def from_exception(cls, exc: BaseException, *args, **kwargs) -> "AirbyteTracedException": # type: ignore # ignoring because of args and kwargs
93+
def from_exception(cls, exc: BaseException, stream_descriptor: Optional[StreamDescriptor] = None, *args, **kwargs) -> "AirbyteTracedException": # type: ignore # ignoring because of args and kwargs
8894
"""
8995
Helper to create an AirbyteTracedException from an existing exception
9096
:param exc: the exception that caused the error
97+
:param stream_descriptor: describe the stream from which the exception comes from
9198
"""
92-
return cls(internal_message=str(exc), exception=exc, *args, **kwargs) # type: ignore # ignoring because of args and kwargs
99+
return cls(internal_message=str(exc), exception=exc, stream_descriptor=stream_descriptor, *args, **kwargs) # type: ignore # ignoring because of args and kwargs
93100

94-
def as_sanitized_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> AirbyteMessage:
101+
def as_sanitized_airbyte_message(self, stream_descriptor: Optional[StreamDescriptor] = None) -> AirbyteMessage:
95102
"""
96103
Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
104+
105+
:param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
106+
stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded.
97107
"""
98108
error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
99109
if error_message.trace.error.message:

airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

+35
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,41 @@ def test_handle_on_partition_complete_sentinel_yields_status_message_if_the_stre
270270
assert messages == expected_messages
271271
self._a_closed_partition.close.assert_called_once()
272272

273+
@freezegun.freeze_time("2020-01-01T00:00:00")
274+
def test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete(self) -> None:
275+
self._a_closed_partition.stream_name.return_value = self._stream.name
276+
self._a_closed_partition.close.side_effect = ValueError
277+
278+
handler = ConcurrentReadProcessor(
279+
[self._stream],
280+
self._partition_enqueuer,
281+
self._thread_pool_manager,
282+
self._logger,
283+
self._slice_logger,
284+
self._message_repository,
285+
self._partition_reader,
286+
)
287+
handler.start_next_partition_generator()
288+
handler.on_partition(self._a_closed_partition)
289+
list(handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._stream)))
290+
messages = list(handler.on_partition_complete_sentinel(PartitionCompleteSentinel(self._a_closed_partition)))
291+
292+
expected_status_message = AirbyteMessage(
293+
type=MessageType.TRACE,
294+
trace=AirbyteTraceMessage(
295+
type=TraceType.STREAM_STATUS,
296+
stream_status=AirbyteStreamStatusTraceMessage(
297+
stream_descriptor=StreamDescriptor(
298+
name=self._stream.name,
299+
),
300+
status=AirbyteStreamStatus.INCOMPLETE,
301+
),
302+
emitted_at=1577836800000.0,
303+
),
304+
)
305+
assert list(map(lambda message: message.trace.type, messages)) == [TraceType.ERROR, TraceType.STREAM_STATUS]
306+
assert messages[1] == expected_status_message
307+
273308
@freezegun.freeze_time("2020-01-01T00:00:00")
274309
def test_handle_on_partition_complete_sentinel_yields_no_status_message_if_the_stream_is_not_done(self):
275310
stream_instances_to_read_from = [self._stream]

airbyte-cdk/python/unit_tests/utils/test_traced_exception.py

+29
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
)
1616
from airbyte_cdk.models.airbyte_protocol import Type as MessageType
1717
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
18+
from airbyte_protocol.models import StreamDescriptor
19+
20+
_AN_EXCEPTION = ValueError("An exception")
21+
_A_STREAM_DESCRIPTOR = StreamDescriptor(name="a_stream")
22+
_ANOTHER_STREAM_DESCRIPTOR = StreamDescriptor(name="another_stream")
1823

1924

2025
@pytest.fixture
@@ -105,3 +110,27 @@ def test_emit_message(capsys):
105110
printed_message.trace.emitted_at = 0.0
106111

107112
assert printed_message == expected_message
113+
114+
115+
def test_given_both_init_and_as_message_with_stream_descriptor_when_as_airbyte_message_use_init_stream_descriptor() -> None:
116+
traced_exc = AirbyteTracedException(stream_descriptor=_A_STREAM_DESCRIPTOR)
117+
message = traced_exc.as_airbyte_message(stream_descriptor=_ANOTHER_STREAM_DESCRIPTOR)
118+
assert message.trace.error.stream_descriptor == _A_STREAM_DESCRIPTOR
119+
120+
121+
def test_given_both_init_and_as_sanitized_airbyte_message_with_stream_descriptor_when_as_airbyte_message_use_init_stream_descriptor() -> None:
122+
traced_exc = AirbyteTracedException(stream_descriptor=_A_STREAM_DESCRIPTOR)
123+
message = traced_exc.as_sanitized_airbyte_message(stream_descriptor=_ANOTHER_STREAM_DESCRIPTOR)
124+
assert message.trace.error.stream_descriptor == _A_STREAM_DESCRIPTOR
125+
126+
127+
def test_given_both_from_exception_and_as_message_with_stream_descriptor_when_as_airbyte_message_use_init_stream_descriptor() -> None:
128+
traced_exc = AirbyteTracedException.from_exception(_AN_EXCEPTION, stream_descriptor=_A_STREAM_DESCRIPTOR)
129+
message = traced_exc.as_airbyte_message(stream_descriptor=_ANOTHER_STREAM_DESCRIPTOR)
130+
assert message.trace.error.stream_descriptor == _A_STREAM_DESCRIPTOR
131+
132+
133+
def test_given_both_from_exception_and_as_sanitized_airbyte_message_with_stream_descriptor_when_as_airbyte_message_use_init_stream_descriptor() -> None:
134+
traced_exc = AirbyteTracedException.from_exception(_AN_EXCEPTION, stream_descriptor=_A_STREAM_DESCRIPTOR)
135+
message = traced_exc.as_sanitized_airbyte_message(stream_descriptor=_ANOTHER_STREAM_DESCRIPTOR)
136+
assert message.trace.error.stream_descriptor == _A_STREAM_DESCRIPTOR

0 commit comments

Comments
 (0)