|
5 | 5 | from abc import ABC
|
6 | 6 | from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Union
|
7 | 7 |
|
8 |
| -from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog |
| 8 | +from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, FailureType |
9 | 9 | from airbyte_cdk.sources import AbstractSource
|
10 | 10 | from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
|
11 | 11 | from airbyte_cdk.sources.streams import Stream
|
12 | 12 | from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
|
13 | 13 | from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
|
| 14 | +from airbyte_cdk.utils.traced_exception import AirbyteTracedException |
14 | 15 |
|
15 | 16 |
|
16 | 17 | class ConcurrentSourceAdapter(AbstractSource, ABC):
|
@@ -54,10 +55,18 @@ def _select_abstract_streams(self, config: Mapping[str, Any], configured_catalog
|
54 | 55 | if not stream_instance:
|
55 | 56 | if not self.raise_exception_on_missing_stream:
|
56 | 57 | continue
|
57 |
| - raise KeyError( |
58 |
| - f"The stream {configured_stream.stream.name} no longer exists in the configuration. " |
59 |
| - f"Refresh the schema in replication settings and remove this stream from future sync attempts." |
| 58 | + |
| 59 | + error_message = ( |
| 60 | + f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. " |
| 61 | + f"Refresh the schema in your replication settings and remove this stream from future sync attempts." |
| 62 | + ) |
| 63 | + |
| 64 | + raise AirbyteTracedException( |
| 65 | + message="A stream listed in your configuration was not found in the source. Please check the logs for more details.", |
| 66 | + internal_message=error_message, |
| 67 | + failure_type=FailureType.config_error, |
60 | 68 | )
|
| 69 | + |
61 | 70 | if isinstance(stream_instance, AbstractStreamFacade):
|
62 | 71 | abstract_streams.append(stream_instance.get_underlying_stream())
|
63 | 72 | return abstract_streams
|
0 commit comments