Skip to content

Commit 991a552

Browse files
brianjlaixiaohansong
authored andcommitted
Emit multiple error trace messages and continue syncs by default (#34636)
1 parent 119e287 commit 991a552

File tree

5 files changed

+243
-52
lines changed

5 files changed

+243
-52
lines changed

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
ConfiguredAirbyteCatalog,
1616
ConfiguredAirbyteStream,
1717
Status,
18+
StreamDescriptor,
1819
SyncMode,
1920
)
2021
from airbyte_cdk.models import Type as MessageType
@@ -27,6 +28,7 @@
2728
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
2829
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
2930
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
31+
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
3032
from airbyte_cdk.utils.event_timing import create_timer
3133
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
3234
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
@@ -133,27 +135,44 @@ def read(
133135
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
134136
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.COMPLETE)
135137
except AirbyteTracedException as e:
138+
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
139+
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
136140
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
137-
if self.continue_sync_on_stream_failure:
138-
stream_name_to_exception[stream_instance.name] = e
139-
else:
140-
raise e
141+
yield e.as_sanitized_airbyte_message(stream_descriptor=StreamDescriptor(name=configured_stream.stream.name))
142+
stream_name_to_exception[stream_instance.name] = e
143+
if self.stop_sync_on_stream_failure:
144+
logger.info(
145+
f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
146+
)
147+
break
141148
except Exception as e:
142149
yield from self._emit_queued_messages()
143150
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
144151
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
145152
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
146153
display_message = stream_instance.get_error_display_message(e)
147154
if display_message:
148-
raise AirbyteTracedException.from_exception(e, message=display_message) from e
149-
raise e
155+
traced_exception = AirbyteTracedException.from_exception(e, message=display_message)
156+
else:
157+
traced_exception = AirbyteTracedException.from_exception(e)
158+
yield traced_exception.as_sanitized_airbyte_message(
159+
stream_descriptor=StreamDescriptor(name=configured_stream.stream.name)
160+
)
161+
stream_name_to_exception[stream_instance.name] = traced_exception
162+
if self.stop_sync_on_stream_failure:
163+
logger.info(f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}")
164+
break
150165
finally:
151166
timer.finish_event()
152167
logger.info(f"Finished syncing {configured_stream.stream.name}")
153168
logger.info(timer.report())
154169

155-
if self.continue_sync_on_stream_failure and len(stream_name_to_exception) > 0:
156-
raise AirbyteTracedException(message=self._generate_failed_streams_error_message(stream_name_to_exception))
170+
if len(stream_name_to_exception) > 0:
171+
error_message = self._generate_failed_streams_error_message(stream_name_to_exception)
172+
logger.info(error_message)
173+
# We still raise at least one exception when a stream raises an exception because the platform
174+
# currently relies on a non-zero exit code to determine if a sync attempt has failed
175+
raise AirbyteTracedException(message=error_message)
157176
logger.info(f"Finished syncing {self.name}")
158177

159178
@property
@@ -282,17 +301,17 @@ def message_repository(self) -> Union[None, MessageRepository]:
282301
return _default_message_repository
283302

284303
@property
285-
def continue_sync_on_stream_failure(self) -> bool:
304+
def stop_sync_on_stream_failure(self) -> bool:
286305
"""
287306
WARNING: This function is in-development which means it is subject to change. Use at your own risk.
288307
289-
By default, a source should raise an exception and stop the sync when it encounters an error while syncing a stream. This
290-
method can be overridden on a per-source basis so that a source will continue syncing streams other streams even if an
291-
exception is raised for a stream.
308+
By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then
309+
continue syncing the next stream. This can be overwridden on a per-source basis so that the source will stop the sync
310+
on the first error seen and emit a single error trace message for that stream.
292311
"""
293312
return False
294313

295314
@staticmethod
296315
def _generate_failed_streams_error_message(stream_failures: Mapping[str, AirbyteTracedException]) -> str:
297-
failures = ", ".join([f"{stream}: {exception.__repr__()}" for stream, exception in stream_failures.items()])
316+
failures = ", ".join([f"{stream}: {filter_secrets(exception.__repr__())}" for stream, exception in stream_failures.items()])
298317
return f"During the sync, the following streams did not sync successfully: {failures}"

airbyte-cdk/python/airbyte_cdk/utils/traced_exception.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
AirbyteTraceMessage,
1414
FailureType,
1515
Status,
16+
StreamDescriptor,
1617
TraceType,
1718
)
1819
from airbyte_cdk.models import Type as MessageType
@@ -43,7 +44,7 @@ def __init__(
4344
self._exception = exception
4445
super().__init__(internal_message)
4546

46-
def as_airbyte_message(self) -> AirbyteMessage:
47+
def as_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> AirbyteMessage:
4748
"""
4849
Builds an AirbyteTraceMessage from the exception
4950
"""
@@ -60,6 +61,7 @@ def as_airbyte_message(self) -> AirbyteMessage:
6061
internal_message=self.internal_message,
6162
failure_type=self.failure_type,
6263
stack_trace=stack_trace_str,
64+
stream_descriptor=stream_descriptor,
6365
),
6466
)
6567

@@ -88,3 +90,16 @@ def from_exception(cls, exc: BaseException, *args, **kwargs) -> "AirbyteTracedEx
8890
:param exc: the exception that caused the error
8991
"""
9092
return cls(internal_message=str(exc), exception=exc, *args, **kwargs) # type: ignore # ignoring because of args and kwargs
93+
94+
def as_sanitized_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> AirbyteMessage:
95+
"""
96+
Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
97+
"""
98+
error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
99+
if error_message.trace.error.message:
100+
error_message.trace.error.message = filter_secrets(error_message.trace.error.message)
101+
if error_message.trace.error.internal_message:
102+
error_message.trace.error.internal_message = filter_secrets(error_message.trace.error.internal_message)
103+
if error_message.trace.error.stack_trace:
104+
error_message.trace.error.stack_trace = filter_secrets(error_message.trace.error.stack_trace)
105+
return error_message

0 commit comments

Comments
 (0)