diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index be9b77517..3293731fd 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -34,8 +34,9 @@ from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) +from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter from airbyte_cdk.sources.declarative.requesters import HttpRequester -from airbyte_cdk.sources.declarative.retrievers import Retriever, SimpleRetriever +from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( DeclarativePartitionFactory, StreamSlicerPartitionGenerator, @@ -48,7 +49,7 @@ from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( AlwaysAvailableAvailabilityStrategy, ) -from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor +from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream @@ -69,6 +70,10 @@ def __init__( component_factory: Optional[ModelToComponentFactory] = None, **kwargs: Any, ) -> None: + # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source + # no longer needs to store the original incoming state. But maybe there's an edge case? + self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later + # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic # cursors. We do this by no longer automatically instantiating RFR cursors when converting # the declarative models into runtime components. Concurrent sources will continue to checkpoint @@ -76,6 +81,7 @@ def __init__( component_factory = component_factory or ModelToComponentFactory( emit_connector_builder_messages=emit_connector_builder_messages, disable_resumable_full_refresh=True, + connector_state_manager=self._connector_state_manager, ) super().__init__( @@ -86,10 +92,6 @@ def __init__( component_factory=component_factory, ) - # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source - # no longer needs to store the original incoming state. But maybe there's an edge case? - self._state = state - concurrency_level_from_manifest = self._source_config.get("concurrency_level") if concurrency_level_from_manifest: concurrency_level_component = self._constructor.create_component( @@ -179,8 +181,6 @@ def _group_streams( concurrent_streams: List[AbstractStream] = [] synchronous_streams: List[Stream] = [] - state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later - # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, # and this is validated during the initialization of the source. streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( @@ -220,31 +220,52 @@ def _group_streams( if self._is_datetime_incremental_without_partition_routing( declarative_stream, incremental_sync_component_definition ): - stream_state = state_manager.get_stream_state( + stream_state = self._connector_state_manager.get_stream_state( stream_name=declarative_stream.name, namespace=declarative_stream.namespace ) - cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( - state_manager=state_manager, - model_type=DatetimeBasedCursorModel, - component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above - stream_name=declarative_stream.name, - stream_namespace=declarative_stream.namespace, - config=config or {}, - stream_state=stream_state, - ) - retriever = self._get_retriever(declarative_stream, stream_state) - partition_generator = StreamSlicerPartitionGenerator( - DeclarativePartitionFactory( - declarative_stream.name, - declarative_stream.get_json_schema(), - retriever, - self.message_repository, - ), - cursor, - ) + if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( + declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter + ): + cursor = declarative_stream.retriever.stream_slicer.stream_slicer + + if not isinstance(cursor, ConcurrentCursor): + # This should never happen since we instantiate ConcurrentCursor in + # model_to_component_factory.py + raise ValueError( + f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" + ) + + partition_generator = StreamSlicerPartitionGenerator( + partition_factory=DeclarativePartitionFactory( + declarative_stream.name, + declarative_stream.get_json_schema(), + retriever, + self.message_repository, + ), + stream_slicer=declarative_stream.retriever.stream_slicer, + ) + else: + cursor = ( + self._constructor.create_concurrent_cursor_from_datetime_based_cursor( + model_type=DatetimeBasedCursorModel, + component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above + stream_name=declarative_stream.name, + stream_namespace=declarative_stream.namespace, + config=config or {}, + ) + ) + partition_generator = StreamSlicerPartitionGenerator( + partition_factory=DeclarativePartitionFactory( + declarative_stream.name, + declarative_stream.get_json_schema(), + retriever, + self.message_repository, + ), + stream_slicer=cursor, + ) concurrent_streams.append( DefaultStream( @@ -306,14 +327,14 @@ def _group_streams( declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor ) ): - stream_state = state_manager.get_stream_state( + stream_state = self._connector_state_manager.get_stream_state( stream_name=declarative_stream.name, namespace=declarative_stream.namespace ) partition_router = declarative_stream.retriever.stream_slicer._partition_router perpartition_cursor = ( self._constructor.create_concurrent_cursor_from_perpartition_cursor( - state_manager=state_manager, + state_manager=self._connector_state_manager, model_type=DatetimeBasedCursorModel, component_definition=incremental_sync_component_definition, stream_name=declarative_stream.name, @@ -369,7 +390,10 @@ def _is_datetime_incremental_without_partition_routing( declarative_stream=declarative_stream ) and hasattr(declarative_stream.retriever, "stream_slicer") - and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) + and ( + isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) + or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) + ) ) def _stream_supports_concurrent_partition_processing( @@ -438,8 +462,9 @@ def _stream_supports_concurrent_partition_processing( return False return True + @staticmethod def _get_retriever( - self, declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] + declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] ) -> Retriever: retriever = declarative_stream.retriever diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index deef5a3be..78aeac23f 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -26,9 +26,6 @@ from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - CheckStream as CheckStreamModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DeclarativeStream as DeclarativeStreamModel, ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 172946f17..793a5bb8c 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -507,6 +507,7 @@ def __init__( disable_cache: bool = False, disable_resumable_full_refresh: bool = False, message_repository: Optional[MessageRepository] = None, + connector_state_manager: Optional[ConnectorStateManager] = None, ): self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice @@ -518,6 +519,7 @@ def __init__( self._message_repository = message_repository or InMemoryMessageRepository( self._evaluate_log_level(emit_connector_builder_messages) ) + self._connector_state_manager = connector_state_manager or ConnectorStateManager() def _init_mappings(self) -> None: self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { @@ -927,17 +929,24 @@ def create_concurrency_level( def create_concurrent_cursor_from_datetime_based_cursor( self, - state_manager: ConnectorStateManager, model_type: Type[BaseModel], component_definition: ComponentDefinition, stream_name: str, stream_namespace: Optional[str], config: Config, - stream_state: MutableMapping[str, Any], message_repository: Optional[MessageRepository] = None, runtime_lookback_window: Optional[datetime.timedelta] = None, **kwargs: Any, ) -> ConcurrentCursor: + # Per-partition incremental streams can dynamically create child cursors which will pass their current + # state via the stream_state keyword argument. Incremental syncs without parent streams use the + # incoming state and connector_state_manager that is initialized when the component factory is created + stream_state = ( + self._connector_state_manager.get_stream_state(stream_name, stream_namespace) + if "stream_state" not in kwargs + else kwargs["stream_state"] + ) + component_type = component_definition.get("type") if component_definition.get("type") != model_type.__name__: raise ValueError( @@ -1131,7 +1140,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( stream_namespace=stream_namespace, stream_state=stream_state, message_repository=message_repository or self._message_repository, - connector_state_manager=state_manager, + connector_state_manager=self._connector_state_manager, connector_state_converter=connector_state_converter, cursor_field=cursor_field, slice_boundary_fields=slice_boundary_fields, @@ -1681,6 +1690,22 @@ def _merge_stream_slicers( stream_cursor=cursor_component, ) elif model.incremental_sync: + if model.retriever.type == "AsyncRetriever": + if model.incremental_sync.type != "DatetimeBasedCursor": + # We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here. + raise ValueError( + "AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet" + ) + if model.retriever.partition_router: + # Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor` + raise ValueError("Per partition state is not supported yet for AsyncRetriever") + return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing + model_type=DatetimeBasedCursorModel, + component_definition=model.incremental_sync.__dict__, + stream_name=model.name or "", + stream_namespace=None, + config=config or {}, + ) return ( self._create_component_from_model(model=model.incremental_sync, config=config) if model.incremental_sync diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index 1b8860289..bd28e0e2d 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -75,7 +75,7 @@ def _validate_and_get_stream_slice_partition( """ if not isinstance(stream_slice, StreamSlice) or "partition" not in stream_slice.partition: raise AirbyteTracedException( - message="Invalid arguments to AsyncJobRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support", + message="Invalid arguments to AsyncRetriever.read_records: stream_slice is not optional. Please contact Airbyte Support", failure_type=FailureType.system_error, ) return stream_slice["partition"] # type: ignore # stream_slice["partition"] has been added as an AsyncPartition as part of stream_slices diff --git a/airbyte_cdk/sources/types.py b/airbyte_cdk/sources/types.py index 3c466ccd8..d4db76f87 100644 --- a/airbyte_cdk/sources/types.py +++ b/airbyte_cdk/sources/types.py @@ -6,7 +6,7 @@ from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView -import orjson +from airbyte_cdk.utils.slice_hasher import SliceHasher # A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2": # "hello"}] returns "hello" @@ -151,7 +151,9 @@ def __json_serializable__(self) -> Any: return self._stream_slice def __hash__(self) -> int: - return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS)) + return SliceHasher.hash( + stream_slice=self._stream_slice + ) # no need to provide stream_name here as this is used for slicing the cursor def __bool__(self) -> bool: return bool(self._stream_slice) or bool(self._extra_fields) diff --git a/airbyte_cdk/utils/slice_hasher.py b/airbyte_cdk/utils/slice_hasher.py index 7f46dd768..25950a934 100644 --- a/airbyte_cdk/utils/slice_hasher.py +++ b/airbyte_cdk/utils/slice_hasher.py @@ -16,7 +16,14 @@ class SliceHasher: _ENCODING: Final = "utf-8" @classmethod - def hash(cls, stream_name: str, stream_slice: Optional[Mapping[str, Any]] = None) -> int: + def hash( + cls, + stream_name: str = "", + stream_slice: Optional[Mapping[str, Any]] = None, + ) -> int: + """ + Note that streams partition with the same slicing value but with different names might collapse if stream name is not provided + """ if stream_slice: try: s = json.dumps(stream_slice, sort_keys=True, cls=SliceEncoder) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 12edb32b1..3ee744e09 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -12,7 +12,15 @@ import requests from airbyte_cdk import AirbyteTracedException -from airbyte_cdk.models import FailureType, Level +from airbyte_cdk.models import ( + AirbyteStateBlob, + AirbyteStateMessage, + AirbyteStateType, + AirbyteStreamState, + FailureType, + Level, + StreamDescriptor, +) from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator, JwtAuthenticator @@ -3093,11 +3101,23 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields( "legacy": {}, } - connector_state_manager = ConnectorStateManager() + stream_name = "test" - connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + connector_state_manager = ConnectorStateManager( + state=[ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name=stream_name), + stream_state=AirbyteStateBlob(stream_state), + ), + ) + ] + ) - stream_name = "test" + connector_builder_factory = ModelToComponentFactory( + emit_connector_builder_messages=True, connector_state_manager=connector_state_manager + ) cursor_component_definition = { "type": "DatetimeBasedCursor", @@ -3114,13 +3134,11 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields( concurrent_cursor = ( connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor( - state_manager=connector_state_manager, model_type=DatetimeBasedCursorModel, component_definition=cursor_component_definition, stream_name=stream_name, stream_namespace=None, config=config, - stream_state=stream_state, ) ) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index ce88804c4..a8c9f77ba 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -33,6 +33,10 @@ ConcurrentDeclarativeSource, ) from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter +from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( + StreamSlicerPartitionGenerator, +) from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.checkpoint import Cursor from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor @@ -322,6 +326,7 @@ "http_method": "GET", }, }, + "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, "schema_loader": { "type": "InlineSchemaLoader", "schema": { @@ -1601,6 +1606,47 @@ def test_given_partition_routing_and_incremental_sync_then_stream_is_concurrent( assert len(synchronous_streams) == 0 +def test_async_incremental_stream_uses_concurrent_cursor_with_state(): + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="async_job_stream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-08-06"), + ), + ) + ] + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state + ) + + expected_state = { + "legacy": {"updated_at": "2024-08-06"}, + "slices": [ + { + "end": datetime(2024, 8, 6, 0, 0, tzinfo=timezone.utc), + "most_recent_cursor_value": datetime(2024, 8, 6, 0, 0, tzinfo=timezone.utc), + "start": datetime(2024, 7, 1, 0, 0, tzinfo=timezone.utc), + } + ], + "state_type": "date-range", + } + + concurrent_streams, _ = source._group_streams(config=_CONFIG) + async_job_stream = concurrent_streams[6] + assert isinstance(async_job_stream, DefaultStream) + cursor = async_job_stream._cursor + assert isinstance(cursor, ConcurrentCursor) + assert cursor._concurrent_state == expected_state + stream_partition_generator = async_job_stream._stream_partition_generator + assert isinstance(stream_partition_generator, StreamSlicerPartitionGenerator) + async_job_partition_router = stream_partition_generator._stream_slicer + assert isinstance(async_job_partition_router, AsyncJobPartitionRouter) + assert isinstance(async_job_partition_router.stream_slicer, ConcurrentCursor) + assert async_job_partition_router.stream_slicer._concurrent_state == expected_state + + def create_wrapped_stream(stream: DeclarativeStream) -> Stream: slice_to_records_mapping = get_mocked_read_records_output(stream_name=stream.name)