Skip to content

Break Python application with status 1 on exception #37390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/exception_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import logging
import sys
from types import TracebackType
from typing import Any, Optional
from typing import Any, List, Mapping, Optional

from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


Expand Down Expand Up @@ -36,3 +37,10 @@ def hook_fn(exception_type: type[BaseException], exception_value: BaseException,
traced_exc.emit_message()

sys.excepthook = hook_fn


def generate_failed_streams_error_message(stream_failures: Mapping[str, List[Exception]]) -> str:
failures = "\n".join(
[f"{stream}: {filter_secrets(exception.__repr__())}" for stream, exceptions in stream_failures.items() for exception in exceptions]
)
return f"During the sync, the following streams did not sync successfully: {failures}"
9 changes: 2 additions & 7 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union

from airbyte_cdk.exception_handler import generate_failed_streams_error_message
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
Expand All @@ -29,7 +30,6 @@
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand Down Expand Up @@ -177,7 +177,7 @@ def read(
logger.info(timer.report())

if len(stream_name_to_exception) > 0:
error_message = self._generate_failed_streams_error_message(stream_name_to_exception)
error_message = generate_failed_streams_error_message({key: [value] for key, value in stream_name_to_exception.items()}) # type: ignore # for some reason, mypy can't figure out the types for key and value
logger.info(error_message)
# We still raise at least one exception when a stream raises an exception because the platform currently relies
# on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
Expand Down Expand Up @@ -276,11 +276,6 @@ def stop_sync_on_stream_failure(self) -> bool:
"""
return False

@staticmethod
def _generate_failed_streams_error_message(stream_failures: Mapping[str, AirbyteTracedException]) -> str:
failures = ", ".join([f"{stream}: {filter_secrets(exception.__repr__())}" for stream, exception in stream_failures.items()])
return f"During the sync, the following streams did not sync successfully: {failures}"

@staticmethod
def _stream_state_is_full_refresh(stream_state: Mapping[str, Any]) -> bool:
# For full refresh syncs that don't have a suitable cursor value, we emit a state that contains a sentinel key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from typing import Dict, Iterable, List, Optional, Set

from airbyte_cdk.exception_handler import generate_failed_streams_error_message
from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import PartitionGenerationCompletedSentinel
Expand All @@ -20,7 +21,7 @@
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
from airbyte_protocol.models import StreamDescriptor
from airbyte_protocol.models import FailureType, StreamDescriptor


class ConcurrentReadProcessor:
Expand Down Expand Up @@ -183,7 +184,17 @@ def is_done(self) -> bool:
2. There are no more streams to read from
3. All partitions for all streams are closed
"""
return all([self._is_stream_done(stream_name) for stream_name in self._stream_name_to_instance.keys()])
is_done = all([self._is_stream_done(stream_name) for stream_name in self._stream_name_to_instance.keys()])
if is_done and self._exceptions_per_stream_name:
error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
self._logger.info(error_message)
# We still raise at least one exception when a stream raises an exception because the platform currently relies
# on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
# type because this combined error isn't actionable, but rather the previously emitted individual errors.
raise AirbyteTracedException(
message=error_message, internal_message="Concurrent read failure", failure_type=FailureType.config_error
)
return is_done

def _is_stream_done(self, stream_name: str) -> bool:
return stream_name in self._streams_done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ConcurrencyCompatibleStateType
from airbyte_cdk.test.state_builder import StateBuilder
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from unit_tests.sources.file_based.scenarios.scenario_builder import IncrementalScenarioConfig, TestScenarioBuilder
from unit_tests.sources.streams.concurrent.scenarios.stream_facade_builder import StreamFacadeSourceBuilder
from unit_tests.sources.streams.concurrent.scenarios.utils import MockStream
Expand Down Expand Up @@ -36,7 +37,7 @@
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
.set_input_state(_NO_INPUT_STATE)
)
.set_expected_read_error(ValueError, "test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
.set_incremental_scenario_config(IncrementalScenarioConfig(input_state=_NO_INPUT_STATE))
.build()
Expand Down Expand Up @@ -113,7 +114,7 @@
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
.set_input_state(LEGACY_STATE)
)
.set_expected_read_error(ValueError, "test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
.set_incremental_scenario_config(IncrementalScenarioConfig(input_state=LEGACY_STATE))
.build()
Expand Down Expand Up @@ -200,7 +201,7 @@
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
.set_input_state(CONCURRENT_STATE)
)
.set_expected_read_error(ValueError, "test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
.set_incremental_scenario_config(IncrementalScenarioConfig(input_state=CONCURRENT_STATE))
.build()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from unit_tests.sources.file_based.scenarios.scenario_builder import IncrementalScenarioConfig, TestScenarioBuilder
from unit_tests.sources.streams.concurrent.scenarios.stream_facade_builder import StreamFacadeSourceBuilder
from unit_tests.sources.streams.concurrent.scenarios.utils import MockStream
Expand Down Expand Up @@ -158,7 +158,7 @@
]
}
)
.set_expected_read_error(StreamThreadException, "Exception while syncing stream stream1: test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.build()
)

Expand Down Expand Up @@ -442,7 +442,7 @@
)
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
)
.set_expected_read_error(ValueError, "test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
.set_incremental_scenario_config(
IncrementalScenarioConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#
import logging

from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.message import InMemoryMessageRepository
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from unit_tests.sources.file_based.scenarios.scenario_builder import TestScenarioBuilder
from unit_tests.sources.streams.concurrent.scenarios.thread_based_concurrent_stream_source_builder import (
AlwaysAvailableAvailabilityStrategy,
Expand Down Expand Up @@ -302,7 +302,7 @@

test_concurrent_cdk_partition_raises_exception = (
TestScenarioBuilder()
.set_name("test_concurrent_partition_raises_exception")
.set_name("test_concurrent_cdk_partition_raises_exception")
.set_config({})
.set_source_builder(
ConcurrentSourceBuilder()
Expand All @@ -318,7 +318,7 @@
{"data": {"id": "1"}, "stream": "stream1"},
]
)
.set_expected_read_error(StreamThreadException, "Exception while syncing stream stream1: test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.set_expected_catalog(
{
"streams": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from unittest.mock import Mock, call

import freezegun
import pytest
from airbyte_cdk.models import (
AirbyteLogMessage,
AirbyteMessage,
Expand All @@ -30,6 +31,7 @@
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

_STREAM_NAME = "stream"
_ANOTHER_STREAM_NAME = "stream2"
Expand Down Expand Up @@ -560,6 +562,29 @@ def test_on_exception_return_trace_message_and_on_stream_complete_return_stream_
),
)
]
with pytest.raises(AirbyteTracedException):
handler.is_done()

def test_given_partition_completion_is_not_success_then_do_not_close_partition(self):
stream_instances_to_read_from = [self._stream, self._another_stream]

handler = ConcurrentReadProcessor(
stream_instances_to_read_from,
self._partition_enqueuer,
self._thread_pool_manager,
self._logger,
self._slice_logger,
self._message_repository,
self._partition_reader,
)

handler.start_next_partition_generator()
handler.on_partition(self._an_open_partition)
list(handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._stream)))

list(handler.on_partition_complete_sentinel(PartitionCompleteSentinel(self._an_open_partition, not _IS_SUCCESSFUL)))

assert self._an_open_partition.close.call_count == 0

def test_given_partition_completion_is_not_success_then_do_not_close_partition(self):
stream_instances_to_read_from = [self._stream, self._another_stream]
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/unit_tests/sources/test_source_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_an_e
config = {}
catalog = _create_configured_catalog(source._streams)
messages_from_abstract_source = _read_from_source(source, logger, config, catalog, state, AirbyteTracedException)
messages_from_concurrent_source = _read_from_source(concurrent_source, logger, config, catalog, state, RuntimeError)
messages_from_concurrent_source = _read_from_source(concurrent_source, logger, config, catalog, state, AirbyteTracedException)

_assert_status_messages(messages_from_abstract_source, messages_from_concurrent_source)
_assert_record_messages(messages_from_abstract_source, messages_from_concurrent_source)
Expand Down