Skip to content

Commit cc4f2e3

Browse files
clnolljatinyadav-cc
authored andcommitted
File-based CDK: make full refresh concurrent (airbytehq#34411)
1 parent d65406a commit cc4f2e3

24 files changed

+1042
-195
lines changed

airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_source_adapter.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
1111
from airbyte_cdk.sources.streams import Stream
1212
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
13-
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
13+
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
1414

1515

1616
class ConcurrentSourceAdapter(AbstractSource, ABC):
@@ -58,6 +58,6 @@ def _select_abstract_streams(self, config: Mapping[str, Any], configured_catalog
5858
f"The stream {configured_stream.stream.name} no longer exists in the configuration. "
5959
f"Refresh the schema in replication settings and remove this stream from future sync attempts."
6060
)
61-
if isinstance(stream_instance, StreamFacade):
62-
abstract_streams.append(stream_instance._abstract_stream)
61+
if isinstance(stream_instance, AbstractStreamFacade):
62+
abstract_streams.append(stream_instance.get_underlying_stream())
6363
return abstract_streams
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy
1+
from .abstract_file_based_availability_strategy import (
2+
AbstractFileBasedAvailabilityStrategy,
3+
AbstractFileBasedAvailabilityStrategyWrapper,
4+
)
25
from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
36

4-
__all__ = ["AbstractFileBasedAvailabilityStrategy", "DefaultFileBasedAvailabilityStrategy"]
7+
__all__ = ["AbstractFileBasedAvailabilityStrategy", "AbstractFileBasedAvailabilityStrategyWrapper", "DefaultFileBasedAvailabilityStrategy"]

airbyte-cdk/python/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88

99
from airbyte_cdk.sources import Source
1010
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
11+
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
12+
AbstractAvailabilityStrategy,
13+
StreamAvailability,
14+
StreamAvailable,
15+
StreamUnavailable,
16+
)
1117
from airbyte_cdk.sources.streams.core import Stream
1218

1319
if TYPE_CHECKING:
@@ -35,3 +41,17 @@ def check_availability_and_parsability(
3541
Returns (True, None) if successful, otherwise (False, <error message>).
3642
"""
3743
...
44+
45+
46+
class AbstractFileBasedAvailabilityStrategyWrapper(AbstractAvailabilityStrategy):
47+
def __init__(self, stream: "AbstractFileBasedStream"):
48+
self.stream = stream
49+
50+
def check_availability(self, logger: logging.Logger) -> StreamAvailability:
51+
is_available, reason = self.stream.availability_strategy.check_availability(self.stream, logger, None)
52+
if is_available:
53+
return StreamAvailable()
54+
return StreamUnavailable(reason or "")
55+
56+
def check_availability_and_parsability(self, logger: logging.Logger) -> Tuple[bool, Optional[str]]:
57+
return self.stream.availability_strategy.check_availability_and_parsability(self.stream, logger, None)

airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,18 @@
88
from collections import Counter
99
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
1010

11-
from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, ConnectorSpecification
12-
from airbyte_cdk.sources import AbstractSource
11+
from airbyte_cdk.logger import AirbyteLogFormatter, init_logger
12+
from airbyte_cdk.models import (
13+
AirbyteMessage,
14+
AirbyteStateMessage,
15+
ConfiguredAirbyteCatalog,
16+
ConnectorSpecification,
17+
FailureType,
18+
Level,
19+
SyncMode,
20+
)
21+
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
22+
from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter
1323
from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy, DefaultFileBasedAvailabilityStrategy
1424
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
1525
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, ValidationPolicy
@@ -20,19 +30,33 @@
2030
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
2131
from airbyte_cdk.sources.file_based.schema_validation_policies import DEFAULT_SCHEMA_VALIDATION_POLICIES, AbstractSchemaValidationPolicy
2232
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream, DefaultFileBasedStream
33+
from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamFacade
34+
from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedNoopCursor
2335
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
2436
from airbyte_cdk.sources.file_based.stream.cursor.default_file_based_cursor import DefaultFileBasedCursor
37+
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository, MessageRepository
38+
from airbyte_cdk.sources.source import TState
2539
from airbyte_cdk.sources.streams import Stream
2640
from airbyte_cdk.utils.analytics_message import create_analytics_message
41+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
2742
from pydantic.error_wrappers import ValidationError
2843

44+
DEFAULT_CONCURRENCY = 100
45+
MAX_CONCURRENCY = 100
46+
INITIAL_N_PARTITIONS = MAX_CONCURRENCY // 2
47+
48+
49+
class FileBasedSource(ConcurrentSourceAdapter, ABC):
50+
# We make each source override the concurrency level to give control over when they are upgraded.
51+
_concurrency_level = None
2952

30-
class FileBasedSource(AbstractSource, ABC):
3153
def __init__(
3254
self,
3355
stream_reader: AbstractFileBasedStreamReader,
3456
spec_class: Type[AbstractFileBasedSpec],
35-
catalog_path: Optional[str] = None,
57+
catalog: Optional[ConfiguredAirbyteCatalog],
58+
config: Optional[Mapping[str, Any]],
59+
state: Optional[TState],
3660
availability_strategy: Optional[AbstractFileBasedAvailabilityStrategy] = None,
3761
discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(),
3862
parsers: Mapping[Type[Any], FileTypeParser] = default_parsers,
@@ -41,15 +65,29 @@ def __init__(
4165
):
4266
self.stream_reader = stream_reader
4367
self.spec_class = spec_class
68+
self.config = config
69+
self.catalog = catalog
70+
self.state = state
4471
self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy(stream_reader)
4572
self.discovery_policy = discovery_policy
4673
self.parsers = parsers
4774
self.validation_policies = validation_policies
48-
catalog = self.read_catalog(catalog_path) if catalog_path else None
4975
self.stream_schemas = {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {}
5076
self.cursor_cls = cursor_cls
51-
self.logger = logging.getLogger(f"airbyte.{self.name}")
77+
self.logger = init_logger(f"airbyte.{self.name}")
5278
self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector()
79+
self._message_repository: Optional[MessageRepository] = None
80+
concurrent_source = ConcurrentSource.create(
81+
MAX_CONCURRENCY, INITIAL_N_PARTITIONS, self.logger, self._slice_logger, self.message_repository
82+
)
83+
self._state = None
84+
super().__init__(concurrent_source)
85+
86+
@property
87+
def message_repository(self) -> MessageRepository:
88+
if self._message_repository is None:
89+
self._message_repository = InMemoryMessageRepository(Level(AirbyteLogFormatter.level_mapping[self.logger.level]))
90+
return self._message_repository
5391

5492
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
5593
"""
@@ -61,7 +99,15 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
6199
62100
Otherwise, the "error" object should describe what went wrong.
63101
"""
64-
streams = self.streams(config)
102+
try:
103+
streams = self.streams(config)
104+
except Exception as config_exception:
105+
raise AirbyteTracedException(
106+
internal_message="Please check the logged errors for more information.",
107+
message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
108+
exception=AirbyteTracedException(exception=config_exception),
109+
failure_type=FailureType.config_error,
110+
)
65111
if len(streams) == 0:
66112
return (
67113
False,
@@ -80,7 +126,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
80126
reason,
81127
) = stream.availability_strategy.check_availability_and_parsability(stream, logger, self)
82128
except Exception:
83-
errors.append(f"Unable to connect to stream {stream} - {''.join(traceback.format_exc())}")
129+
errors.append(f"Unable to connect to stream {stream.name} - {''.join(traceback.format_exc())}")
84130
else:
85131
if not stream_is_available and reason:
86132
errors.append(reason)
@@ -91,10 +137,26 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
91137
"""
92138
Return a list of this source's streams.
93139
"""
140+
file_based_streams = self._get_file_based_streams(config)
141+
142+
configured_streams: List[Stream] = []
143+
144+
for stream in file_based_streams:
145+
sync_mode = self._get_sync_mode_from_catalog(stream)
146+
if sync_mode == SyncMode.full_refresh and hasattr(self, "_concurrency_level") and self._concurrency_level is not None:
147+
configured_streams.append(
148+
FileBasedStreamFacade.create_from_stream(stream, self, self.logger, None, FileBasedNoopCursor(stream.config))
149+
)
150+
else:
151+
configured_streams.append(stream)
152+
153+
return configured_streams
154+
155+
def _get_file_based_streams(self, config: Mapping[str, Any]) -> List[AbstractFileBasedStream]:
94156
try:
95157
parsed_config = self._get_parsed_config(config)
96158
self.stream_reader.config = parsed_config
97-
streams: List[Stream] = []
159+
streams: List[AbstractFileBasedStream] = []
98160
for stream_config in parsed_config.streams:
99161
self._validate_input_schema(stream_config)
100162
streams.append(
@@ -115,6 +177,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
115177
except ValidationError as exc:
116178
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc
117179

180+
def _get_sync_mode_from_catalog(self, stream: Stream) -> Optional[SyncMode]:
181+
if self.catalog:
182+
for catalog_stream in self.catalog.streams:
183+
if stream.name == catalog_stream.stream.name:
184+
return catalog_stream.sync_mode
185+
raise RuntimeError(f"No sync mode was found for {stream.name}.")
186+
return None
187+
118188
def read(
119189
self,
120190
logger: logging.Logger,

airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from functools import partial
1111
from io import IOBase
1212
from typing import Any, Callable, Dict, Generator, Iterable, List, Mapping, Optional, Set, Tuple
13+
from uuid import uuid4
1314

1415
from airbyte_cdk.models import FailureType
1516
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat, CsvHeaderAutogenerated, CsvHeaderUserProvided, InferenceType
@@ -38,8 +39,10 @@ def read_data(
3839

3940
# Formats are configured individually per-stream so a unique dialect should be registered for each stream.
4041
# We don't unregister the dialect because we are lazily parsing each csv file to generate records
41-
# This will potentially be a problem if we ever process multiple streams concurrently
42-
dialect_name = config.name + DIALECT_NAME
42+
# Give each stream's dialect a unique name; otherwise, when we are doing a concurrent sync we can end up
43+
# with a race condition where a thread attempts to use a dialect before a separate thread has finished
44+
# registering it.
45+
dialect_name = f"{config.name}_{str(uuid4())}_{DIALECT_NAME}"
4346
csv.register_dialect(
4447
dialect_name,
4548
delimiter=config_format.delimiter,

airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)