Skip to content

Break Python application with status 1 on exception #37390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from typing import Dict, Iterable, List, Optional, Set
from typing import Dict, Iterable, List, Optional, Set, Mapping

from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus
from airbyte_cdk.models import Type as MessageType
Expand All @@ -19,8 +19,9 @@
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
from airbyte_protocol.models import StreamDescriptor
from airbyte_protocol.models import StreamDescriptor, FailureType


class ConcurrentReadProcessor:
Expand Down Expand Up @@ -100,7 +101,8 @@ def on_partition_complete_sentinel(self, sentinel: PartitionCompleteSentinel) ->
partition = sentinel.partition

try:
partition.close()
if sentinel.is_successful:
partition.close()
except Exception as exception:
self._flag_exception(partition.stream_name(), exception)
yield AirbyteTracedException.from_exception(
Expand Down Expand Up @@ -174,6 +176,11 @@ def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
else:
return None

@staticmethod
def _generate_failed_streams_error_message(stream_failures: Mapping[str, List[Exception]]) -> str:
failures = ", ".join([f"{stream}: {filter_secrets(exception.__repr__())}" for stream, exceptions in stream_failures.items() for exception in exceptions])
return f"During the sync, the following streams did not sync successfully: {failures}"

def is_done(self) -> bool:
"""
This method is called to check if the sync is done.
Expand All @@ -182,7 +189,15 @@ def is_done(self) -> bool:
2. There are no more streams to read from
3. All partitions for all streams are closed
"""
return all([self._is_stream_done(stream_name) for stream_name in self._stream_name_to_instance.keys()])
is_done = all([self._is_stream_done(stream_name) for stream_name in self._stream_name_to_instance.keys()])
if is_done and self._exceptions_per_stream_name:
error_message = self._generate_failed_streams_error_message(self._exceptions_per_stream_name)
self._logger.info(error_message)
# We still raise at least one exception when a stream raises an exception because the platform currently relies
# on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
# type because this combined error isn't actionable, but rather the previously emitted individual errors.
raise AirbyteTracedException(message=error_message, internal_message="Concurrent read failure", failure_type=FailureType.config_error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a internal_message is different than abstract_source. I had to add an internal message as the scenario framework will assert on the internal_message. Leaving this empty would generate 'NoneType' object is not subscriptable

return is_done

def _is_stream_done(self, stream_name: str) -> bool:
return stream_name in self._streams_done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class PartitionReader:
Generates records from a partition and puts them in a queue.
"""

_IS_SUCCESSFUL = True

def __init__(self, queue: Queue[QueueItem]) -> None:
"""
:param queue: The queue to put the records in.
Expand All @@ -34,7 +36,7 @@ def process_partition(self, partition: Partition) -> None:
try:
for record in partition.read():
self._queue.put(record)
self._queue.put(PartitionCompleteSentinel(partition))
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
except Exception as e:
self._queue.put(StreamThreadException(e, partition.stream_name()))
self._queue.put(PartitionCompleteSentinel(partition))
self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ class PartitionCompleteSentinel:
Includes a pointer to the partition that was processed.
"""

def __init__(self, partition: Partition):
def __init__(self, partition: Partition, is_successful: bool = True):
"""
:param partition: The partition that was processed
"""
self.partition = partition
self.is_successful = is_successful

def __eq__(self, other: Any) -> bool:
if isinstance(other, PartitionCompleteSentinel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ConcurrencyCompatibleStateType
from airbyte_cdk.test.state_builder import StateBuilder
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from unit_tests.sources.file_based.scenarios.scenario_builder import IncrementalScenarioConfig, TestScenarioBuilder
from unit_tests.sources.streams.concurrent.scenarios.stream_facade_builder import StreamFacadeSourceBuilder
from unit_tests.sources.streams.concurrent.scenarios.utils import MockStream
Expand Down Expand Up @@ -36,7 +37,7 @@
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
.set_input_state(_NO_INPUT_STATE)
)
.set_expected_read_error(ValueError, "test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
.set_incremental_scenario_config(IncrementalScenarioConfig(input_state=_NO_INPUT_STATE))
.build()
Expand Down Expand Up @@ -113,7 +114,7 @@
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
.set_input_state(LEGACY_STATE)
)
.set_expected_read_error(ValueError, "test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
.set_incremental_scenario_config(IncrementalScenarioConfig(input_state=LEGACY_STATE))
.build()
Expand Down Expand Up @@ -200,7 +201,7 @@
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
.set_input_state(CONCURRENT_STATE)
)
.set_expected_read_error(ValueError, "test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
.set_incremental_scenario_config(IncrementalScenarioConfig(input_state=CONCURRENT_STATE))
.build()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from unit_tests.sources.file_based.scenarios.scenario_builder import IncrementalScenarioConfig, TestScenarioBuilder
from unit_tests.sources.streams.concurrent.scenarios.stream_facade_builder import StreamFacadeSourceBuilder
from unit_tests.sources.streams.concurrent.scenarios.utils import MockStream
Expand Down Expand Up @@ -158,7 +158,7 @@
]
}
)
.set_expected_read_error(StreamThreadException, "Exception while syncing stream stream1: test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.build()
)

Expand Down Expand Up @@ -442,7 +442,7 @@
)
.set_incremental(CursorField("cursor_field"), _NO_SLICE_BOUNDARIES)
)
.set_expected_read_error(ValueError, "test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
.set_incremental_scenario_config(
IncrementalScenarioConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#
import logging

from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.message import InMemoryMessageRepository
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from unit_tests.sources.file_based.scenarios.scenario_builder import TestScenarioBuilder
from unit_tests.sources.streams.concurrent.scenarios.thread_based_concurrent_stream_source_builder import (
AlwaysAvailableAvailabilityStrategy,
Expand Down Expand Up @@ -302,7 +302,7 @@

test_concurrent_cdk_partition_raises_exception = (
TestScenarioBuilder()
.set_name("test_concurrent_partition_raises_exception")
.set_name("test_concurrent_cdk_partition_raises_exception")
.set_config({})
.set_source_builder(
ConcurrentSourceBuilder()
Expand All @@ -318,7 +318,7 @@
{"data": {"id": "1"}, "stream": "stream1"},
]
)
.set_expected_read_error(StreamThreadException, "Exception while syncing stream stream1: test exception")
.set_expected_read_error(AirbyteTracedException, "Concurrent read failure")
.set_expected_catalog(
{
"streams": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from unittest.mock import Mock, call

import freezegun
import pytest
from airbyte_cdk.models import (
AirbyteLogMessage,
AirbyteMessage,
Expand All @@ -30,10 +31,12 @@
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

_STREAM_NAME = "stream"
_ANOTHER_STREAM_NAME = "stream2"
_ANY_AIRBYTE_MESSAGE = Mock(spec=AirbyteMessage)
_IS_SUCCESSFUL = True


class TestConcurrentReadProcessor(unittest.TestCase):
Expand Down Expand Up @@ -559,6 +562,29 @@ def test_on_exception_return_trace_message_and_on_stream_complete_return_stream_
),
)
]
with pytest.raises(AirbyteTracedException):
handler.is_done()

def test_given_partition_completion_is_not_success_then_do_not_close_partition(self):
stream_instances_to_read_from = [self._stream, self._another_stream]

handler = ConcurrentReadProcessor(
stream_instances_to_read_from,
self._partition_enqueuer,
self._thread_pool_manager,
self._logger,
self._slice_logger,
self._message_repository,
self._partition_reader,
)

handler.start_next_partition_generator()
handler.on_partition(self._an_open_partition)
list(handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._stream)))

list(handler.on_partition_complete_sentinel(PartitionCompleteSentinel(self._an_open_partition, not _IS_SUCCESSFUL)))

assert self._an_open_partition.close.call_count == 0

def test_is_done_is_false_if_there_are_any_instances_to_read_from(self):
stream_instances_to_read_from = [self._stream]
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/unit_tests/sources/test_source_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_an_e
config = {}
catalog = _create_configured_catalog(source._streams)
messages_from_abstract_source = _read_from_source(source, logger, config, catalog, state, AirbyteTracedException)
messages_from_concurrent_source = _read_from_source(concurrent_source, logger, config, catalog, state, RuntimeError)
messages_from_concurrent_source = _read_from_source(concurrent_source, logger, config, catalog, state, AirbyteTracedException)

_assert_status_messages(messages_from_abstract_source, messages_from_concurrent_source)
_assert_record_messages(messages_from_abstract_source, messages_from_concurrent_source)
Expand Down