Skip to content

Commit c964574

Browse files
maxi297brianjlaicoderabbitai[bot]aaronsteersoctavia-squidington-iii
authored
feat(low-code concurrent): Allow async job low-code streams that are incremental to be run by the concurrent framework (#228)
Co-authored-by: brianjlai <[email protected]> Co-authored-by: Brian Lai <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: Aaron ("AJ") Steers <[email protected]> Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 0a12a58 commit c964574

File tree

8 files changed

+168
-48
lines changed

8 files changed

+168
-48
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
3535
ModelToComponentFactory,
3636
)
37+
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
3738
from airbyte_cdk.sources.declarative.requesters import HttpRequester
38-
from airbyte_cdk.sources.declarative.retrievers import Retriever, SimpleRetriever
39+
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
3940
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
4041
DeclarativePartitionFactory,
4142
StreamSlicerPartitionGenerator,
@@ -48,7 +49,7 @@
4849
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
4950
AlwaysAvailableAvailabilityStrategy,
5051
)
51-
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
52+
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
5253
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
5354
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
5455

@@ -69,13 +70,18 @@ def __init__(
6970
component_factory: Optional[ModelToComponentFactory] = None,
7071
**kwargs: Any,
7172
) -> None:
73+
# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
74+
# no longer needs to store the original incoming state. But maybe there's an edge case?
75+
self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
76+
7277
# To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
7378
# cursors. We do this by no longer automatically instantiating RFR cursors when converting
7479
# the declarative models into runtime components. Concurrent sources will continue to checkpoint
7580
# incremental streams running in full refresh.
7681
component_factory = component_factory or ModelToComponentFactory(
7782
emit_connector_builder_messages=emit_connector_builder_messages,
7883
disable_resumable_full_refresh=True,
84+
connector_state_manager=self._connector_state_manager,
7985
)
8086

8187
super().__init__(
@@ -86,10 +92,6 @@ def __init__(
8692
component_factory=component_factory,
8793
)
8894

89-
# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
90-
# no longer needs to store the original incoming state. But maybe there's an edge case?
91-
self._state = state
92-
9395
concurrency_level_from_manifest = self._source_config.get("concurrency_level")
9496
if concurrency_level_from_manifest:
9597
concurrency_level_component = self._constructor.create_component(
@@ -179,8 +181,6 @@ def _group_streams(
179181
concurrent_streams: List[AbstractStream] = []
180182
synchronous_streams: List[Stream] = []
181183

182-
state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
183-
184184
# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
185185
# and this is validated during the initialization of the source.
186186
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
@@ -220,31 +220,52 @@ def _group_streams(
220220
if self._is_datetime_incremental_without_partition_routing(
221221
declarative_stream, incremental_sync_component_definition
222222
):
223-
stream_state = state_manager.get_stream_state(
223+
stream_state = self._connector_state_manager.get_stream_state(
224224
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
225225
)
226226

227-
cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
228-
state_manager=state_manager,
229-
model_type=DatetimeBasedCursorModel,
230-
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
231-
stream_name=declarative_stream.name,
232-
stream_namespace=declarative_stream.namespace,
233-
config=config or {},
234-
stream_state=stream_state,
235-
)
236-
237227
retriever = self._get_retriever(declarative_stream, stream_state)
238228

239-
partition_generator = StreamSlicerPartitionGenerator(
240-
DeclarativePartitionFactory(
241-
declarative_stream.name,
242-
declarative_stream.get_json_schema(),
243-
retriever,
244-
self.message_repository,
245-
),
246-
cursor,
247-
)
229+
if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
230+
declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
231+
):
232+
cursor = declarative_stream.retriever.stream_slicer.stream_slicer
233+
234+
if not isinstance(cursor, ConcurrentCursor):
235+
# This should never happen since we instantiate ConcurrentCursor in
236+
# model_to_component_factory.py
237+
raise ValueError(
238+
f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
239+
)
240+
241+
partition_generator = StreamSlicerPartitionGenerator(
242+
partition_factory=DeclarativePartitionFactory(
243+
declarative_stream.name,
244+
declarative_stream.get_json_schema(),
245+
retriever,
246+
self.message_repository,
247+
),
248+
stream_slicer=declarative_stream.retriever.stream_slicer,
249+
)
250+
else:
251+
cursor = (
252+
self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
253+
model_type=DatetimeBasedCursorModel,
254+
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
255+
stream_name=declarative_stream.name,
256+
stream_namespace=declarative_stream.namespace,
257+
config=config or {},
258+
)
259+
)
260+
partition_generator = StreamSlicerPartitionGenerator(
261+
partition_factory=DeclarativePartitionFactory(
262+
declarative_stream.name,
263+
declarative_stream.get_json_schema(),
264+
retriever,
265+
self.message_repository,
266+
),
267+
stream_slicer=cursor,
268+
)
248269

249270
concurrent_streams.append(
250271
DefaultStream(
@@ -306,14 +327,14 @@ def _group_streams(
306327
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
307328
)
308329
):
309-
stream_state = state_manager.get_stream_state(
330+
stream_state = self._connector_state_manager.get_stream_state(
310331
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
311332
)
312333
partition_router = declarative_stream.retriever.stream_slicer._partition_router
313334

314335
perpartition_cursor = (
315336
self._constructor.create_concurrent_cursor_from_perpartition_cursor(
316-
state_manager=state_manager,
337+
state_manager=self._connector_state_manager,
317338
model_type=DatetimeBasedCursorModel,
318339
component_definition=incremental_sync_component_definition,
319340
stream_name=declarative_stream.name,
@@ -369,7 +390,10 @@ def _is_datetime_incremental_without_partition_routing(
369390
declarative_stream=declarative_stream
370391
)
371392
and hasattr(declarative_stream.retriever, "stream_slicer")
372-
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
393+
and (
394+
isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
395+
or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
396+
)
373397
)
374398

375399
def _stream_supports_concurrent_partition_processing(
@@ -438,8 +462,9 @@ def _stream_supports_concurrent_partition_processing(
438462
return False
439463
return True
440464

465+
@staticmethod
441466
def _get_retriever(
442-
self, declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
467+
declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
443468
) -> Retriever:
444469
retriever = declarative_stream.retriever
445470

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@
2626
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
2727
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
2828
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
29-
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
30-
CheckStream as CheckStreamModel,
31-
)
3229
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3330
DeclarativeStream as DeclarativeStreamModel,
3431
)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,7 @@ def __init__(
507507
disable_cache: bool = False,
508508
disable_resumable_full_refresh: bool = False,
509509
message_repository: Optional[MessageRepository] = None,
510+
connector_state_manager: Optional[ConnectorStateManager] = None,
510511
):
511512
self._init_mappings()
512513
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
@@ -518,6 +519,7 @@ def __init__(
518519
self._message_repository = message_repository or InMemoryMessageRepository(
519520
self._evaluate_log_level(emit_connector_builder_messages)
520521
)
522+
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
521523

522524
def _init_mappings(self) -> None:
523525
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
@@ -927,17 +929,24 @@ def create_concurrency_level(
927929

928930
def create_concurrent_cursor_from_datetime_based_cursor(
929931
self,
930-
state_manager: ConnectorStateManager,
931932
model_type: Type[BaseModel],
932933
component_definition: ComponentDefinition,
933934
stream_name: str,
934935
stream_namespace: Optional[str],
935936
config: Config,
936-
stream_state: MutableMapping[str, Any],
937937
message_repository: Optional[MessageRepository] = None,
938938
runtime_lookback_window: Optional[datetime.timedelta] = None,
939939
**kwargs: Any,
940940
) -> ConcurrentCursor:
941+
# Per-partition incremental streams can dynamically create child cursors which will pass their current
942+
# state via the stream_state keyword argument. Incremental syncs without parent streams use the
943+
# incoming state and connector_state_manager that is initialized when the component factory is created
944+
stream_state = (
945+
self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
946+
if "stream_state" not in kwargs
947+
else kwargs["stream_state"]
948+
)
949+
941950
component_type = component_definition.get("type")
942951
if component_definition.get("type") != model_type.__name__:
943952
raise ValueError(
@@ -1131,7 +1140,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
11311140
stream_namespace=stream_namespace,
11321141
stream_state=stream_state,
11331142
message_repository=message_repository or self._message_repository,
1134-
connector_state_manager=state_manager,
1143+
connector_state_manager=self._connector_state_manager,
11351144
connector_state_converter=connector_state_converter,
11361145
cursor_field=cursor_field,
11371146
slice_boundary_fields=slice_boundary_fields,
@@ -1681,6 +1690,22 @@ def _merge_stream_slicers(
16811690
stream_cursor=cursor_component,
16821691
)
16831692
elif model.incremental_sync:
1693+
if model.retriever.type == "AsyncRetriever":
1694+
if model.incremental_sync.type != "DatetimeBasedCursor":
1695+
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here.
1696+
raise ValueError(
1697+
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet"
1698+
)
1699+
if model.retriever.partition_router:
1700+
# Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor`
1701+
raise ValueError("Per partition state is not supported yet for AsyncRetriever")
1702+
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
1703+
model_type=DatetimeBasedCursorModel,
1704+
component_definition=model.incremental_sync.__dict__,
1705+
stream_name=model.name or "",
1706+
stream_namespace=None,
1707+
config=config or {},
1708+
)
16841709
return (
16851710
self._create_component_from_model(model=model.incremental_sync, config=config)
16861711
if model.incremental_sync

airbyte_cdk/sources/declarative/retrievers/async_retriever.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def _validate_and_get_stream_slice_partition(
7575
"""
7676
if not isinstance(stream_slice, StreamSlice) or "partition" not in stream_slice.partition:
7777
raise AirbyteTracedException(
78-
message="Invalid arguments to AsyncJobRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support",
78+
message="Invalid arguments to AsyncRetriever.read_records: stream_slice is not optional. Please contact Airbyte Support",
7979
failure_type=FailureType.system_error,
8080
)
8181
return stream_slice["partition"] # type: ignore # stream_slice["partition"] has been added as an AsyncPartition as part of stream_slices

airbyte_cdk/sources/types.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView
88

9-
import orjson
9+
from airbyte_cdk.utils.slice_hasher import SliceHasher
1010

1111
# A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2":
1212
# "hello"}] returns "hello"
@@ -151,7 +151,9 @@ def __json_serializable__(self) -> Any:
151151
return self._stream_slice
152152

153153
def __hash__(self) -> int:
154-
return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS))
154+
return SliceHasher.hash(
155+
stream_slice=self._stream_slice
156+
) # no need to provide stream_name here as this is used for slicing the cursor
155157

156158
def __bool__(self) -> bool:
157159
return bool(self._stream_slice) or bool(self._extra_fields)

airbyte_cdk/utils/slice_hasher.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,14 @@ class SliceHasher:
1616
_ENCODING: Final = "utf-8"
1717

1818
@classmethod
19-
def hash(cls, stream_name: str, stream_slice: Optional[Mapping[str, Any]] = None) -> int:
19+
def hash(
20+
cls,
21+
stream_name: str = "<stream name not provided>",
22+
stream_slice: Optional[Mapping[str, Any]] = None,
23+
) -> int:
24+
"""
25+
Note that streams partition with the same slicing value but with different names might collapse if stream name is not provided
26+
"""
2027
if stream_slice:
2128
try:
2229
s = json.dumps(stream_slice, sort_keys=True, cls=SliceEncoder)

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,15 @@
1212
import requests
1313

1414
from airbyte_cdk import AirbyteTracedException
15-
from airbyte_cdk.models import FailureType, Level
15+
from airbyte_cdk.models import (
16+
AirbyteStateBlob,
17+
AirbyteStateMessage,
18+
AirbyteStateType,
19+
AirbyteStreamState,
20+
FailureType,
21+
Level,
22+
StreamDescriptor,
23+
)
1624
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
1725
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
1826
from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator, JwtAuthenticator
@@ -3093,11 +3101,23 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
30933101
"legacy": {},
30943102
}
30953103

3096-
connector_state_manager = ConnectorStateManager()
3104+
stream_name = "test"
30973105

3098-
connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True)
3106+
connector_state_manager = ConnectorStateManager(
3107+
state=[
3108+
AirbyteStateMessage(
3109+
type=AirbyteStateType.STREAM,
3110+
stream=AirbyteStreamState(
3111+
stream_descriptor=StreamDescriptor(name=stream_name),
3112+
stream_state=AirbyteStateBlob(stream_state),
3113+
),
3114+
)
3115+
]
3116+
)
30993117

3100-
stream_name = "test"
3118+
connector_builder_factory = ModelToComponentFactory(
3119+
emit_connector_builder_messages=True, connector_state_manager=connector_state_manager
3120+
)
31013121

31023122
cursor_component_definition = {
31033123
"type": "DatetimeBasedCursor",
@@ -3114,13 +3134,11 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
31143134

31153135
concurrent_cursor = (
31163136
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
3117-
state_manager=connector_state_manager,
31183137
model_type=DatetimeBasedCursorModel,
31193138
component_definition=cursor_component_definition,
31203139
stream_name=stream_name,
31213140
stream_namespace=None,
31223141
config=config,
3123-
stream_state=stream_state,
31243142
)
31253143
)
31263144

0 commit comments

Comments
 (0)