Skip to content

Commit acbdc2d

Browse files
Introduce FinalStateCursor to emit state messages at the end of full refresh syncs (#35905)
Co-authored-by: brianjlai <[email protected]>
1 parent 23ac2cd commit acbdc2d

16 files changed

+113
-61
lines changed

airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from airbyte_cdk.sources.file_based.stream.concurrent.cursor import (
3737
AbstractConcurrentFileBasedCursor,
3838
FileBasedConcurrentCursor,
39-
FileBasedNoopCursor,
39+
FileBasedFinalStateCursor,
4040
)
4141
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
4242
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository, MessageRepository
@@ -170,7 +170,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
170170
sync_mode = self._get_sync_mode_from_catalog(stream_config.name)
171171

172172
if sync_mode == SyncMode.full_refresh and hasattr(self, "_concurrency_level") and self._concurrency_level is not None:
173-
cursor = FileBasedNoopCursor(stream_config)
173+
cursor = FileBasedFinalStateCursor(
174+
stream_config=stream_config, stream_namespace=None, message_repository=self.message_repository
175+
)
174176
stream = FileBasedStreamFacade.create_from_stream(
175177
self._make_default_stream(stream_config, cursor), self, self.logger, stream_state, cursor
176178
)

airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
1919
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
2020
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
21-
from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedNoopCursor
21+
from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedFinalStateCursor
2222
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
2323
from airbyte_cdk.sources.file_based.types import StreamSlice
2424
from airbyte_cdk.sources.message import MessageRepository
@@ -71,7 +71,7 @@ def create_from_stream(
7171
partition_generator=FileBasedStreamPartitionGenerator(
7272
stream,
7373
message_repository,
74-
SyncMode.full_refresh if isinstance(cursor, FileBasedNoopCursor) else SyncMode.incremental,
74+
SyncMode.full_refresh if isinstance(cursor, FileBasedFinalStateCursor) else SyncMode.incremental,
7575
[cursor_field] if cursor_field is not None else None,
7676
state,
7777
cursor,
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .abstract_concurrent_file_based_cursor import AbstractConcurrentFileBasedCursor
22
from .file_based_concurrent_cursor import FileBasedConcurrentCursor
3-
from .file_based_noop_cursor import FileBasedNoopCursor
3+
from .file_based_final_state_cursor import FileBasedFinalStateCursor
44

5-
__all__ = ["AbstractConcurrentFileBasedCursor", "FileBasedConcurrentCursor", "FileBasedNoopCursor"]
5+
__all__ = ["AbstractConcurrentFileBasedCursor", "FileBasedConcurrentCursor", "FileBasedFinalStateCursor"]

airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_noop_cursor.py renamed to airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py

+21-6
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,39 @@
44

55
import logging
66
from datetime import datetime
7-
from typing import TYPE_CHECKING, Any, Iterable, List, MutableMapping
7+
from typing import TYPE_CHECKING, Any, Iterable, List, MutableMapping, Optional
88

9+
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
910
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
1011
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
1112
from airbyte_cdk.sources.file_based.stream.concurrent.cursor.abstract_concurrent_file_based_cursor import AbstractConcurrentFileBasedCursor
1213
from airbyte_cdk.sources.file_based.types import StreamState
14+
from airbyte_cdk.sources.message import MessageRepository
15+
from airbyte_cdk.sources.streams import FULL_REFRESH_SENTINEL_STATE_KEY
1316
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1417
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
1518

1619
if TYPE_CHECKING:
1720
from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamPartition
1821

1922

20-
class FileBasedNoopCursor(AbstractConcurrentFileBasedCursor):
21-
def __init__(self, stream_config: FileBasedStreamConfig, **kwargs: Any):
22-
pass
23+
class FileBasedFinalStateCursor(AbstractConcurrentFileBasedCursor):
24+
"""Cursor that is used to guarantee at least one state message is emitted for a concurrent file-based stream."""
25+
26+
def __init__(
27+
self, stream_config: FileBasedStreamConfig, message_repository: MessageRepository, stream_namespace: Optional[str], **kwargs: Any
28+
):
29+
self._stream_name = stream_config.name
30+
self._stream_namespace = stream_namespace
31+
self._message_repository = message_repository
32+
# Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
33+
# state message rather than manage overall source state. This is also only temporary as we move to the resumable
34+
# full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
35+
self._connector_state_manager = ConnectorStateManager(stream_instance_map={})
2336

2437
@property
2538
def state(self) -> MutableMapping[str, Any]:
26-
return {}
39+
return {FULL_REFRESH_SENTINEL_STATE_KEY: True}
2740

2841
def observe(self, record: Record) -> None:
2942
pass
@@ -53,4 +66,6 @@ def emit_state_message(self) -> None:
5366
pass
5467

5568
def ensure_at_least_one_state_emitted(self) -> None:
56-
pass
69+
self._connector_state_manager.update_state_for_stream(self._stream_name, self._stream_namespace, self.state)
70+
state_message = self._connector_state_manager.create_state_message(self._stream_name, self._stream_namespace)
71+
self._message_repository.emit_message(state_message)

airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/adapters.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
StreamAvailable,
2222
StreamUnavailable,
2323
)
24-
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, NoopCursor
24+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
2525
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
2626
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
2727
from airbyte_cdk.sources.streams.concurrent.helpers import get_cursor_field_from_stream, get_primary_key_from_stream
@@ -77,7 +77,7 @@ def create_from_stream(
7777
partition_generator=StreamPartitionGenerator(
7878
stream,
7979
message_repository,
80-
SyncMode.full_refresh if isinstance(cursor, NoopCursor) else SyncMode.incremental,
80+
SyncMode.full_refresh if isinstance(cursor, FinalStateCursor) else SyncMode.incremental,
8181
[cursor_field] if cursor_field is not None else None,
8282
state,
8383
cursor,

airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py

+27-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
1010
from airbyte_cdk.sources.message import MessageRepository
11+
from airbyte_cdk.sources.streams import FULL_REFRESH_SENTINEL_STATE_KEY
1112
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1213
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
1314
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import AbstractStreamStateConverter
@@ -65,10 +66,27 @@ def ensure_at_least_one_state_emitted(self) -> None:
6566
raise NotImplementedError()
6667

6768

68-
class NoopCursor(Cursor):
69+
class FinalStateCursor(Cursor):
70+
"""Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
71+
72+
def __init__(
73+
self,
74+
stream_name: str,
75+
stream_namespace: Optional[str],
76+
message_repository: MessageRepository,
77+
) -> None:
78+
self._stream_name = stream_name
79+
self._stream_namespace = stream_namespace
80+
self._message_repository = message_repository
81+
# Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
82+
# state message rather than manage overall source state. This is also only temporary as we move to the resumable
83+
# full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
84+
self._connector_state_manager = ConnectorStateManager(stream_instance_map={})
85+
self._has_closed_at_least_one_slice = False
86+
6987
@property
7088
def state(self) -> MutableMapping[str, Any]:
71-
return {}
89+
return {FULL_REFRESH_SENTINEL_STATE_KEY: True}
7290

7391
def observe(self, record: Record) -> None:
7492
pass
@@ -77,7 +95,13 @@ def close_partition(self, partition: Partition) -> None:
7795
pass
7896

7997
def ensure_at_least_one_state_emitted(self) -> None:
80-
pass
98+
"""
99+
Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
100+
"""
101+
102+
self._connector_state_manager.update_state_for_stream(self._stream_name, self._stream_namespace, self.state)
103+
state_message = self._connector_state_manager.create_state_message(self._stream_name, self._stream_namespace)
104+
self._message_repository.emit_message(state_message)
81105

82106

83107
class ConcurrentCursor(Cursor):

airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/default_stream.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from airbyte_cdk.models import AirbyteStream, SyncMode
1010
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1111
from airbyte_cdk.sources.streams.concurrent.availability_strategy import AbstractAvailabilityStrategy, StreamAvailability
12-
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, NoopCursor
12+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
1313
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1414
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
1515

@@ -24,7 +24,7 @@ def __init__(
2424
primary_key: List[str],
2525
cursor_field: Optional[str],
2626
logger: Logger,
27-
cursor: Optional[Cursor],
27+
cursor: Cursor,
2828
namespace: Optional[str] = None,
2929
) -> None:
3030
self._stream_partition_generator = partition_generator
@@ -34,7 +34,7 @@ def __init__(
3434
self._primary_key = primary_key
3535
self._cursor_field = cursor_field
3636
self._logger = logger
37-
self._cursor = cursor or NoopCursor()
37+
self._cursor = cursor
3838
self._namespace = namespace
3939

4040
def generate_partitions(self) -> Iterable[Partition]:
@@ -44,6 +44,10 @@ def generate_partitions(self) -> Iterable[Partition]:
4444
def name(self) -> str:
4545
return self._name
4646

47+
@property
48+
def namespace(self) -> Optional[str]:
49+
return self._namespace
50+
4751
def check_availability(self) -> StreamAvailability:
4852
return self._availability_strategy.check_availability(self._logger)
4953

airbyte-cdk/python/unit_tests/sources/concurrent_source/test_concurrent_source_adapter.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from airbyte_cdk.sources.message import InMemoryMessageRepository
2121
from airbyte_cdk.sources.streams import Stream
2222
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
23-
from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor
23+
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
2424

2525

2626
class _MockSource(ConcurrentSourceAdapter):
@@ -36,7 +36,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
3636

3737
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
3838
return [
39-
StreamFacade.create_from_stream(s, self, self._logger, None, NoopCursor()) if is_concurrent else s
39+
StreamFacade.create_from_stream(s, self, self._logger, None, FinalStateCursor(stream_name=s.name, stream_namespace=s.namespace, message_repository=InMemoryMessageRepository())) if is_concurrent else s
4040
for s, is_concurrent in self._streams_to_is_concurrent.items()
4141
]
4242

airbyte-cdk/python/unit_tests/sources/file_based/stream/concurrent/test_adapters.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
FileBasedStreamPartition,
2424
FileBasedStreamPartitionGenerator,
2525
)
26-
from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedNoopCursor
26+
from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedFinalStateCursor
2727
from airbyte_cdk.sources.message import InMemoryMessageRepository
2828
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
2929
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
@@ -36,7 +36,7 @@
3636
_ANY_STATE = {"state_key": "state_value"}
3737
_ANY_CURSOR_FIELD = ["a", "cursor", "key"]
3838
_STREAM_NAME = "stream"
39-
_ANY_CURSOR = Mock(spec=FileBasedNoopCursor)
39+
_ANY_CURSOR = Mock(spec=FileBasedFinalStateCursor)
4040

4141

4242
@pytest.mark.parametrize(
@@ -165,7 +165,7 @@ def setUp(self):
165165
supported_sync_modes=[SyncMode.full_refresh],
166166
)
167167
self._legacy_stream = DefaultFileBasedStream(
168-
cursor=FileBasedNoopCursor(MagicMock()),
168+
cursor=FileBasedFinalStateCursor(stream_config=MagicMock(), stream_namespace=None, message_repository=Mock()),
169169
config=FileBasedStreamConfig(name="stream", format=CsvFormat()),
170170
catalog_schema={},
171171
stream_reader=MagicMock(),

airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from airbyte_cdk.sources.source import TState
2222
from airbyte_cdk.sources.streams import Stream
2323
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
24-
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, NoopCursor
24+
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, FinalStateCursor
2525
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import EpochValueConcurrentStreamStateConverter
2626
from airbyte_protocol.models import ConfiguredAirbyteStream
2727
from unit_tests.sources.file_based.scenarios.scenario_builder import SourceBuilder
@@ -83,7 +83,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
8383
None,
8484
)
8585
if self._cursor_field
86-
else NoopCursor(),
86+
else FinalStateCursor(stream_name=stream.name, stream_namespace=stream.namespace, message_repository=self.message_repository),
8787
)
8888
for stream, state in zip(self._streams, stream_states)
8989
]

0 commit comments

Comments
 (0)