Skip to content

Commit b49c500

Browse files
authored
Break Python application with status 1 on exception (#37390)
1 parent 60575cc commit b49c500

File tree

8 files changed

+60
-20
lines changed

8 files changed

+60
-20
lines changed

airbyte-cdk/python/airbyte_cdk/exception_handler.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
import logging
66
import sys
77
from types import TracebackType
8-
from typing import Any, Optional
8+
from typing import Any, List, Mapping, Optional
99

10+
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
1011
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
1112

1213

@@ -36,3 +37,10 @@ def hook_fn(exception_type: type[BaseException], exception_value: BaseException,
3637
traced_exc.emit_message()
3738

3839
sys.excepthook = hook_fn
40+
41+
42+
def generate_failed_streams_error_message(stream_failures: Mapping[str, List[Exception]]) -> str:
43+
failures = "\n".join(
44+
[f"{stream}: {filter_secrets(exception.__repr__())}" for stream, exceptions in stream_failures.items() for exception in exceptions]
45+
)
46+
return f"During the sync, the following streams did not sync successfully: {failures}"

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from abc import ABC, abstractmethod
77
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
88

9+
from airbyte_cdk.exception_handler import generate_failed_streams_error_message
910
from airbyte_cdk.models import (
1011
AirbyteCatalog,
1112
AirbyteConnectionStatus,
@@ -29,7 +30,6 @@
2930
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
3031
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
3132
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
32-
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
3333
from airbyte_cdk.utils.event_timing import create_timer
3434
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
3535
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
@@ -177,7 +177,7 @@ def read(
177177
logger.info(timer.report())
178178

179179
if len(stream_name_to_exception) > 0:
180-
error_message = self._generate_failed_streams_error_message(stream_name_to_exception)
180+
error_message = generate_failed_streams_error_message({key: [value] for key, value in stream_name_to_exception.items()}) # type: ignore # for some reason, mypy can't figure out the types for key and value
181181
logger.info(error_message)
182182
# We still raise at least one exception when a stream raises an exception because the platform currently relies
183183
# on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
@@ -276,11 +276,6 @@ def stop_sync_on_stream_failure(self) -> bool:
276276
"""
277277
return False
278278

279-
@staticmethod
280-
def _generate_failed_streams_error_message(stream_failures: Mapping[str, AirbyteTracedException]) -> str:
281-
failures = ", ".join([f"{stream}: {filter_secrets(exception.__repr__())}" for stream, exception in stream_failures.items()])
282-
return f"During the sync, the following streams did not sync successfully: {failures}"
283-
284279
@staticmethod
285280
def _stream_state_is_full_refresh(stream_state: Mapping[str, Any]) -> bool:
286281
# For full refresh syncs that don't have a suitable cursor value, we emit a state that contains a sentinel key.

airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
from typing import Dict, Iterable, List, Optional, Set
66

7+
from airbyte_cdk.exception_handler import generate_failed_streams_error_message
78
from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus
89
from airbyte_cdk.models import Type as MessageType
910
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import PartitionGenerationCompletedSentinel
@@ -20,7 +21,7 @@
2021
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
2122
from airbyte_cdk.utils import AirbyteTracedException
2223
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
23-
from airbyte_protocol.models import StreamDescriptor
24+
from airbyte_protocol.models import FailureType, StreamDescriptor
2425

2526

2627
class ConcurrentReadProcessor:
@@ -183,7 +184,17 @@ def is_done(self) -> bool:
183184
2. There are no more streams to read from
184185
3. All partitions for all streams are closed
185186
"""
186-
return all([self._is_stream_done(stream_name) for stream_name in self._stream_name_to_instance.keys()])
187+
is_done = all([self._is_stream_done(stream_name) for stream_name in self._stream_name_to_instance.keys()])
188+
if is_done and self._exceptions_per_stream_name:
189+
error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
190+
self._logger.info(error_message)
191+
# We still raise at least one exception when a stream raises an exception because the platform currently relies
192+
# on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
193+
# type because this combined error isn't actionable, but rather the previously emitted individual errors.
194+
raise AirbyteTracedException(
195+
message=error_message, internal_message="Concurrent read failure", failure_type=FailureType.config_error
196+
)
197+
return is_done
187198

188199
def _is_stream_done(self, stream_name: str) -> bool:
189200
return stream_name in self._streams_done

airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/incremental_scenarios.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
55
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ConcurrencyCompatibleStateType
66
from airbyte_cdk.test.state_builder import StateBuilder
7+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
78
from unit_tests.sources.file_based.scenarios.scenario_builder import IncrementalScenarioConfig, TestScenarioBuilder
89
from unit_tests.sources.streams.concurrent.scenarios.stream_facade_builder import StreamFacadeSourceBuilder
910
from unit_tests.sources.streams.concurrent.scenarios.utils import MockStream
@@ -36,7 +37,7 @@
3637
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
3738
.set_input_state(_NO_INPUT_STATE)
3839
)
39-
.set_expected_read_error(ValueError, "test exception")
40+
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
4041
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
4142
.set_incremental_scenario_config(IncrementalScenarioConfig(input_state=_NO_INPUT_STATE))
4243
.build()
@@ -113,7 +114,7 @@
113114
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
114115
.set_input_state(LEGACY_STATE)
115116
)
116-
.set_expected_read_error(ValueError, "test exception")
117+
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
117118
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
118119
.set_incremental_scenario_config(IncrementalScenarioConfig(input_state=LEGACY_STATE))
119120
.build()
@@ -200,7 +201,7 @@
200201
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
201202
.set_input_state(CONCURRENT_STATE)
202203
)
203-
.set_expected_read_error(ValueError, "test exception")
204+
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
204205
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
205206
.set_incremental_scenario_config(IncrementalScenarioConfig(input_state=CONCURRENT_STATE))
206207
.build()

airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
54
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
5+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
66
from unit_tests.sources.file_based.scenarios.scenario_builder import IncrementalScenarioConfig, TestScenarioBuilder
77
from unit_tests.sources.streams.concurrent.scenarios.stream_facade_builder import StreamFacadeSourceBuilder
88
from unit_tests.sources.streams.concurrent.scenarios.utils import MockStream
@@ -158,7 +158,7 @@
158158
]
159159
}
160160
)
161-
.set_expected_read_error(StreamThreadException, "Exception while syncing stream stream1: test exception")
161+
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
162162
.build()
163163
)
164164

@@ -442,7 +442,7 @@
442442
)
443443
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
444444
)
445-
.set_expected_read_error(ValueError, "test exception")
445+
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
446446
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
447447
.set_incremental_scenario_config(
448448
IncrementalScenarioConfig(

airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
#
44
import logging
55

6-
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
76
from airbyte_cdk.sources.message import InMemoryMessageRepository
87
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
98
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
109
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
10+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
1111
from unit_tests.sources.file_based.scenarios.scenario_builder import TestScenarioBuilder
1212
from unit_tests.sources.streams.concurrent.scenarios.thread_based_concurrent_stream_source_builder import (
1313
AlwaysAvailableAvailabilityStrategy,
@@ -302,7 +302,7 @@
302302

303303
test_concurrent_cdk_partition_raises_exception = (
304304
TestScenarioBuilder()
305-
.set_name("test_concurrent_partition_raises_exception")
305+
.set_name("test_concurrent_cdk_partition_raises_exception")
306306
.set_config({})
307307
.set_source_builder(
308308
ConcurrentSourceBuilder()
@@ -318,7 +318,7 @@
318318
{"data": {"id": "1"}, "stream": "stream1"},
319319
]
320320
)
321-
.set_expected_read_error(StreamThreadException, "Exception while syncing stream stream1: test exception")
321+
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
322322
.set_expected_catalog(
323323
{
324324
"streams": [

airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from unittest.mock import Mock, call
77

88
import freezegun
9+
import pytest
910
from airbyte_cdk.models import (
1011
AirbyteLogMessage,
1112
AirbyteMessage,
@@ -30,6 +31,7 @@
3031
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
3132
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
3233
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
34+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
3335

3436
_STREAM_NAME = "stream"
3537
_ANOTHER_STREAM_NAME = "stream2"
@@ -560,6 +562,29 @@ def test_on_exception_return_trace_message_and_on_stream_complete_return_stream_
560562
),
561563
)
562564
]
565+
with pytest.raises(AirbyteTracedException):
566+
handler.is_done()
567+
568+
def test_given_partition_completion_is_not_success_then_do_not_close_partition(self):
569+
stream_instances_to_read_from = [self._stream, self._another_stream]
570+
571+
handler = ConcurrentReadProcessor(
572+
stream_instances_to_read_from,
573+
self._partition_enqueuer,
574+
self._thread_pool_manager,
575+
self._logger,
576+
self._slice_logger,
577+
self._message_repository,
578+
self._partition_reader,
579+
)
580+
581+
handler.start_next_partition_generator()
582+
handler.on_partition(self._an_open_partition)
583+
list(handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._stream)))
584+
585+
list(handler.on_partition_complete_sentinel(PartitionCompleteSentinel(self._an_open_partition, not _IS_SUCCESSFUL)))
586+
587+
assert self._an_open_partition.close.call_count == 0
563588

564589
def test_given_partition_completion_is_not_success_then_do_not_close_partition(self):
565590
stream_instances_to_read_from = [self._stream, self._another_stream]

airbyte-cdk/python/unit_tests/sources/test_source_read.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_an_e
301301
config = {}
302302
catalog = _create_configured_catalog(source._streams)
303303
messages_from_abstract_source = _read_from_source(source, logger, config, catalog, state, AirbyteTracedException)
304-
messages_from_concurrent_source = _read_from_source(concurrent_source, logger, config, catalog, state, RuntimeError)
304+
messages_from_concurrent_source = _read_from_source(concurrent_source, logger, config, catalog, state, AirbyteTracedException)
305305

306306
_assert_status_messages(messages_from_abstract_source, messages_from_concurrent_source)
307307
_assert_record_messages(messages_from_abstract_source, messages_from_concurrent_source)

0 commit comments

Comments
 (0)