Skip to content

Commit 40d1b17

Browse files
brianjlaixiaohansong
authored andcommitted
Emit multiple error trace messages and continue syncs by default (#35129)
1 parent 72b7aaf commit 40d1b17

File tree

5 files changed

+249
-52
lines changed

5 files changed

+249
-52
lines changed

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
AirbyteStreamStatus,
1515
ConfiguredAirbyteCatalog,
1616
ConfiguredAirbyteStream,
17+
FailureType,
1718
Status,
19+
StreamDescriptor,
1820
SyncMode,
1921
)
2022
from airbyte_cdk.models import Type as MessageType
@@ -27,6 +29,7 @@
2729
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
2830
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
2931
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
32+
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
3033
from airbyte_cdk.utils.event_timing import create_timer
3134
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
3235
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
@@ -133,27 +136,45 @@ def read(
133136
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
134137
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.COMPLETE)
135138
except AirbyteTracedException as e:
139+
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
140+
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
136141
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
137-
if self.continue_sync_on_stream_failure:
138-
stream_name_to_exception[stream_instance.name] = e
139-
else:
140-
raise e
142+
yield e.as_sanitized_airbyte_message(stream_descriptor=StreamDescriptor(name=configured_stream.stream.name))
143+
stream_name_to_exception[stream_instance.name] = e
144+
if self.stop_sync_on_stream_failure:
145+
logger.info(
146+
f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
147+
)
148+
break
141149
except Exception as e:
142150
yield from self._emit_queued_messages()
143151
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
144152
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
145153
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
146154
display_message = stream_instance.get_error_display_message(e)
147155
if display_message:
148-
raise AirbyteTracedException.from_exception(e, message=display_message) from e
149-
raise e
156+
traced_exception = AirbyteTracedException.from_exception(e, message=display_message)
157+
else:
158+
traced_exception = AirbyteTracedException.from_exception(e)
159+
yield traced_exception.as_sanitized_airbyte_message(
160+
stream_descriptor=StreamDescriptor(name=configured_stream.stream.name)
161+
)
162+
stream_name_to_exception[stream_instance.name] = traced_exception
163+
if self.stop_sync_on_stream_failure:
164+
logger.info(f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}")
165+
break
150166
finally:
151167
timer.finish_event()
152168
logger.info(f"Finished syncing {configured_stream.stream.name}")
153169
logger.info(timer.report())
154170

155-
if self.continue_sync_on_stream_failure and len(stream_name_to_exception) > 0:
156-
raise AirbyteTracedException(message=self._generate_failed_streams_error_message(stream_name_to_exception))
171+
if len(stream_name_to_exception) > 0:
172+
error_message = self._generate_failed_streams_error_message(stream_name_to_exception)
173+
logger.info(error_message)
174+
# We still raise at least one exception when a stream raises an exception because the platform currently relies
175+
# on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
176+
# type because this combined error isn't actionable, but rather the previously emitted individual errors.
177+
raise AirbyteTracedException(message=error_message, failure_type=FailureType.config_error)
157178
logger.info(f"Finished syncing {self.name}")
158179

159180
@property
@@ -282,17 +303,17 @@ def message_repository(self) -> Union[None, MessageRepository]:
282303
return _default_message_repository
283304

284305
@property
285-
def continue_sync_on_stream_failure(self) -> bool:
306+
def stop_sync_on_stream_failure(self) -> bool:
286307
"""
287308
WARNING: This function is in-development which means it is subject to change. Use at your own risk.
288309
289-
By default, a source should raise an exception and stop the sync when it encounters an error while syncing a stream. This
290-
method can be overridden on a per-source basis so that a source will continue syncing streams other streams even if an
291-
exception is raised for a stream.
310+
By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then
311+
continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync
312+
on the first error seen and emit a single error trace message for that stream.
292313
"""
293314
return False
294315

295316
@staticmethod
296317
def _generate_failed_streams_error_message(stream_failures: Mapping[str, AirbyteTracedException]) -> str:
297-
failures = ", ".join([f"{stream}: {exception.__repr__()}" for stream, exception in stream_failures.items()])
318+
failures = ", ".join([f"{stream}: {filter_secrets(exception.__repr__())}" for stream, exception in stream_failures.items()])
298319
return f"During the sync, the following streams did not sync successfully: {failures}"

airbyte-cdk/python/airbyte_cdk/utils/traced_exception.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
AirbyteTraceMessage,
1414
FailureType,
1515
Status,
16+
StreamDescriptor,
1617
TraceType,
1718
)
1819
from airbyte_cdk.models import Type as MessageType
@@ -43,7 +44,7 @@ def __init__(
4344
self._exception = exception
4445
super().__init__(internal_message)
4546

46-
def as_airbyte_message(self) -> AirbyteMessage:
47+
def as_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> AirbyteMessage:
4748
"""
4849
Builds an AirbyteTraceMessage from the exception
4950
"""
@@ -60,6 +61,7 @@ def as_airbyte_message(self) -> AirbyteMessage:
6061
internal_message=self.internal_message,
6162
failure_type=self.failure_type,
6263
stack_trace=stack_trace_str,
64+
stream_descriptor=stream_descriptor,
6365
),
6466
)
6567

@@ -88,3 +90,16 @@ def from_exception(cls, exc: BaseException, *args, **kwargs) -> "AirbyteTracedEx
8890
:param exc: the exception that caused the error
8991
"""
9092
return cls(internal_message=str(exc), exception=exc, *args, **kwargs) # type: ignore # ignoring because of args and kwargs
93+
94+
def as_sanitized_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> AirbyteMessage:
95+
"""
96+
Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
97+
"""
98+
error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
99+
if error_message.trace.error.message:
100+
error_message.trace.error.message = filter_secrets(error_message.trace.error.message)
101+
if error_message.trace.error.internal_message:
102+
error_message.trace.error.internal_message = filter_secrets(error_message.trace.error.internal_message)
103+
if error_message.trace.error.stack_trace:
104+
error_message.trace.error.stack_trace = filter_secrets(error_message.trace.error.stack_trace)
105+
return error_message

0 commit comments

Comments
 (0)