Skip to content

Commit 45d5198

Browse files
authored
[airbyte-cdk] refactor error handling in abstract source (#40571)
1 parent c67c5ba commit 45d5198

File tree

1 file changed

+21
-21
lines changed

1 file changed

+21
-21
lines changed

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

+21-21
Original file line numberDiff line numberDiff line change
@@ -139,35 +139,26 @@ def read(
139139
)
140140
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
141141
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.COMPLETE)
142-
except AirbyteTracedException as e:
143-
yield from self._emit_queued_messages()
144-
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
145-
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
146-
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
147-
yield e.as_sanitized_airbyte_message(stream_descriptor=StreamDescriptor(name=configured_stream.stream.name))
148-
stream_name_to_exception[stream_instance.name] = e # type: ignore # use configured_stream if stream_instance is None
149-
if self.stop_sync_on_stream_failure:
150-
logger.info(
151-
f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
152-
)
153-
break
142+
154143
except Exception as e:
155144
yield from self._emit_queued_messages()
156145
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
157146
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
158147
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
159-
display_message = stream_instance.get_error_display_message(e) # type: ignore[union-attr]
148+
160149
stream_descriptor = StreamDescriptor(name=configured_stream.stream.name)
161-
if display_message:
162-
traced_exception = AirbyteTracedException.from_exception(
163-
e, message=display_message, stream_descriptor=stream_descriptor
164-
)
150+
151+
if isinstance(e, AirbyteTracedException):
152+
traced_exception = e
153+
info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
165154
else:
166-
traced_exception = AirbyteTracedException.from_exception(e, stream_descriptor=stream_descriptor)
167-
yield traced_exception.as_sanitized_airbyte_message()
168-
stream_name_to_exception[stream_instance.name] = traced_exception # type: ignore
155+
traced_exception = self._serialize_exception(stream_descriptor, e, stream_instance=stream_instance)
156+
info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}"
157+
158+
yield traced_exception.as_sanitized_airbyte_message(stream_descriptor=stream_descriptor)
159+
stream_name_to_exception[stream_instance.name] = traced_exception # type: ignore # use configured_stream if stream_instance is None
169160
if self.stop_sync_on_stream_failure:
170-
logger.info(f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}")
161+
logger.info(info_message)
171162
break
172163
finally:
173164
# Finish read event only if the stream instance exists;
@@ -186,6 +177,15 @@ def read(
186177
raise AirbyteTracedException(message=error_message, failure_type=FailureType.config_error)
187178
logger.info(f"Finished syncing {self.name}")
188179

180+
@staticmethod
181+
def _serialize_exception(
182+
stream_descriptor: StreamDescriptor, e: Exception, stream_instance: Optional[Stream] = None
183+
) -> AirbyteTracedException:
184+
display_message = stream_instance.get_error_display_message(e) if stream_instance else None
185+
if display_message:
186+
return AirbyteTracedException.from_exception(e, message=display_message, stream_descriptor=stream_descriptor)
187+
return AirbyteTracedException.from_exception(e, stream_descriptor=stream_descriptor)
188+
189189
@property
190190
def raise_exception_on_missing_stream(self) -> bool:
191191
return True

0 commit comments

Comments
 (0)