Skip to content

Commit 03d1958

Browse files
committed
refactor back to old way
1 parent 92f0940 commit 03d1958

File tree

1 file changed

+32
-22
lines changed

1 file changed

+32
-22
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
)
3737
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
3838
from airbyte_cdk.sources.declarative.requesters import HttpRequester
39-
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever
39+
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
4040
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
4141
DeclarativePartitionFactory,
4242
StreamSlicerPartitionGenerator,
@@ -220,27 +220,15 @@ def _group_streams(
220220
if self._is_datetime_incremental_without_partition_routing(
221221
declarative_stream, incremental_sync_component_definition
222222
):
223-
retriever = declarative_stream.retriever
224-
225-
# This is an optimization so that we don't invoke any cursor or state management flows within the
226-
# low-code framework because state management is handled through the ConcurrentCursor.
227-
if declarative_stream and isinstance(retriever, SimpleRetriever):
228-
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
229-
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
230-
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
231-
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
232-
# still rely on a DatetimeBasedCursor that is properly initialized with state.
233-
if retriever.cursor:
234-
stream_state = self._connector_state_manager.get_stream_state(
235-
stream_name=declarative_stream.name,
236-
namespace=declarative_stream.namespace,
237-
)
238-
retriever.cursor.set_initial_state(stream_state=stream_state)
239-
# We zero it out here, but since this is a cursor reference, the state is still properly
240-
# instantiated for the other components that reference it
241-
retriever.cursor = None
223+
stream_state = self._connector_state_manager.get_stream_state(
224+
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
225+
)
226+
227+
retriever = self._get_retriever(declarative_stream, stream_state)
242228

243-
if isinstance(declarative_stream.retriever, AsyncRetriever):
229+
if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
230+
declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
231+
):
244232
cursor = declarative_stream.retriever.stream_slicer.stream_slicer
245233

246234
if not isinstance(cursor, ConcurrentCursor):
@@ -357,7 +345,7 @@ def _group_streams(
357345
)
358346
)
359347

360-
retriever = declarative_stream.retriever
348+
retriever = self._get_retriever(declarative_stream, stream_state)
361349

362350
partition_generator = StreamSlicerPartitionGenerator(
363351
DeclarativePartitionFactory(
@@ -474,6 +462,28 @@ def _stream_supports_concurrent_partition_processing(
474462
return False
475463
return True
476464

465+
@staticmethod
466+
def _get_retriever(
467+
declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
468+
) -> Retriever:
469+
retriever = declarative_stream.retriever
470+
471+
# This is an optimization so that we don't invoke any cursor or state management flows within the
472+
# low-code framework because state management is handled through the ConcurrentCursor.
473+
if declarative_stream and isinstance(retriever, SimpleRetriever):
474+
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
475+
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
476+
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
477+
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
478+
# still rely on a DatetimeBasedCursor that is properly initialized with state.
479+
if retriever.cursor:
480+
retriever.cursor.set_initial_state(stream_state=stream_state)
481+
# We zero it out here, but since this is a cursor reference, the state is still properly
482+
# instantiated for the other components that reference it
483+
retriever.cursor = None
484+
485+
return retriever
486+
477487
@staticmethod
478488
def _select_streams(
479489
streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog

0 commit comments

Comments
 (0)