Skip to content

Commit d74125b

Browse files
authored
[RFR for API Sources] New Python interfaces to support resumable full refresh (#37429)
1 parent 3cca1c0 commit d74125b

File tree

24 files changed

+1655
-232
lines changed

24 files changed

+1655
-232
lines changed

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

+3-16
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
FailureType,
1919
Status,
2020
StreamDescriptor,
21-
SyncMode,
2221
)
2322
from airbyte_cdk.models import Type as MessageType
2423
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
2524
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
2625
from airbyte_cdk.sources.source import Source
27-
from airbyte_cdk.sources.streams import FULL_REFRESH_SENTINEL_STATE_KEY, Stream
26+
from airbyte_cdk.sources.streams import Stream
2827
from airbyte_cdk.sources.streams.core import StreamData
2928
from airbyte_cdk.sources.streams.http.http import HttpStream
3029
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
@@ -211,15 +210,9 @@ def _read_stream(
211210
stream_instance.log_stream_sync_configuration()
212211

213212
stream_name = configured_stream.stream.name
214-
# The platform always passes stream state regardless of sync mode. We shouldn't need to consider this case within the
215-
# connector, but right now we need to prevent accidental usage of the previous stream state
216-
stream_state = (
217-
state_manager.get_stream_state(stream_name, stream_instance.namespace)
218-
if configured_stream.sync_mode == SyncMode.incremental
219-
else {}
220-
)
213+
stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)
221214

222-
if stream_state and "state" in dir(stream_instance) and not self._stream_state_is_full_refresh(stream_state):
215+
if "state" in dir(stream_instance):
223216
stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance)
224217
logger.info(f"Setting state of {self.name} stream to {stream_state}")
225218

@@ -275,9 +268,3 @@ def stop_sync_on_stream_failure(self) -> bool:
275268
on the first error seen and emit a single error trace message for that stream.
276269
"""
277270
return False
278-
279-
@staticmethod
280-
def _stream_state_is_full_refresh(stream_state: Mapping[str, Any]) -> bool:
281-
# For full refresh syncs that don't have a suitable cursor value, we emit a state that contains a sentinel key.
282-
# This key is never used by a connector and is needed during a read to skip assigning the incoming state.
283-
return FULL_REFRESH_SENTINEL_STATE_KEY in stream_state

airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ def cursor_field(self) -> Union[str, List[str]]:
9898
cursor = self._stream_cursor_field.eval(self.config)
9999
return cursor if cursor else []
100100

101+
@property
102+
def is_resumable(self) -> bool:
103+
# Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on
104+
# if the retriever has a cursor defined.
105+
return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False
106+
101107
def read_records(
102108
self,
103109
sync_mode: SyncMode,
@@ -108,7 +114,7 @@ def read_records(
108114
"""
109115
:param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
110116
"""
111-
if stream_slice is None:
117+
if stream_slice is None or stream_slice == {}:
112118
# As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field
113119
# As part of the declarative model without custom components, this should never happen as the CDK would wire up a
114120
# SinglePartitionRouter that would create this StreamSlice properly

airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from airbyte_cdk.sources.file_based.stream.concurrent.cursor.abstract_concurrent_file_based_cursor import AbstractConcurrentFileBasedCursor
1313
from airbyte_cdk.sources.file_based.types import StreamState
1414
from airbyte_cdk.sources.message import MessageRepository
15-
from airbyte_cdk.sources.streams import FULL_REFRESH_SENTINEL_STATE_KEY
15+
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
1616
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1717
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
1818

@@ -36,7 +36,7 @@ def __init__(
3636

3737
@property
3838
def state(self) -> MutableMapping[str, Any]:
39-
return {FULL_REFRESH_SENTINEL_STATE_KEY: True}
39+
return {NO_CURSOR_STATE_KEY: True}
4040

4141
def observe(self, record: Record) -> None:
4242
pass

airbyte-cdk/python/airbyte_cdk/sources/streams/__init__.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
#
44

55
# Initialize Streams Package
6-
from .core import FULL_REFRESH_SENTINEL_STATE_KEY, IncrementalMixin, Stream
6+
from .core import NO_CURSOR_STATE_KEY, IncrementalMixin, CheckpointMixin, Stream
77

8-
__all__ = ["FULL_REFRESH_SENTINEL_STATE_KEY", "IncrementalMixin", "Stream"]
8+
__all__ = ["NO_CURSOR_STATE_KEY", "IncrementalMixin", "CheckpointMixin", "Stream"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
4+
from .checkpoint_reader import CheckpointMode, CheckpointReader, FullRefreshCheckpointReader, IncrementalCheckpointReader, ResumableFullRefreshCheckpointReader
5+
6+
__all__ = ["CheckpointMode", "CheckpointReader", "FullRefreshCheckpointReader", "IncrementalCheckpointReader", "ResumableFullRefreshCheckpointReader"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from abc import ABC, abstractmethod
4+
from enum import Enum
5+
from typing import Any, Iterable, Mapping, Optional
6+
7+
8+
class CheckpointMode(Enum):
9+
INCREMENTAL = "incremental"
10+
RESUMABLE_FULL_REFRESH = "resumable_full_refresh"
11+
FULL_REFRESH = "full_refresh"
12+
13+
14+
class CheckpointReader(ABC):
15+
"""
16+
CheckpointReader manages how to iterate over a stream's partitions and serves as the bridge for interpreting the current state
17+
of the stream that should be emitted back to the platform.
18+
"""
19+
20+
@abstractmethod
21+
def next(self) -> Optional[Mapping[str, Any]]:
22+
"""
23+
Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader
24+
has finished iterating over all slices.
25+
"""
26+
27+
@abstractmethod
28+
def observe(self, new_state: Mapping[str, Any]) -> None:
29+
"""
30+
Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.
31+
32+
WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method.
33+
In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.
34+
"""
35+
36+
@abstractmethod
37+
def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
38+
"""
39+
Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.
40+
"""
41+
42+
43+
class IncrementalCheckpointReader(CheckpointReader):
44+
"""
45+
IncrementalCheckpointReader handles iterating through a stream based on partitioned windows of data that are determined
46+
before syncing data.
47+
"""
48+
49+
def __init__(self, stream_state: Mapping[str, Any], stream_slices: Iterable[Optional[Mapping[str, Any]]]):
50+
self._state: Optional[Mapping[str, Any]] = stream_state
51+
self._stream_slices = iter(stream_slices)
52+
self._has_slices = False
53+
54+
def next(self) -> Optional[Mapping[str, Any]]:
55+
try:
56+
next_slice = next(self._stream_slices)
57+
self._has_slices = True
58+
return next_slice
59+
except StopIteration:
60+
# This is used to avoid sending a duplicate state message at the end of a sync since the stream has already
61+
# emitted state at the end of each slice. If we want to avoid this extra complexity, we can also just accept
62+
# that every sync emits a final duplicate state
63+
if self._has_slices:
64+
self._state = None
65+
return None
66+
67+
def observe(self, new_state: Mapping[str, Any]) -> None:
68+
self._state = new_state
69+
70+
def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
71+
return self._state
72+
73+
74+
class ResumableFullRefreshCheckpointReader(CheckpointReader):
75+
"""
76+
ResumableFullRefreshCheckpointReader allows for iteration over an unbounded set of records based on the pagination strategy
77+
of the stream. Because the number of pages is unknown, the stream's current state is used to determine whether to continue
78+
fetching more pages or stopping the sync.
79+
"""
80+
81+
def __init__(self, stream_state: Mapping[str, Any]):
82+
# The first attempt of an RFR stream has an empty {} incoming state, but should still make a first attempt to read records
83+
# from the first page in next().
84+
self._first_page = bool(stream_state == {})
85+
self._state: Mapping[str, Any] = stream_state
86+
87+
def next(self) -> Optional[Mapping[str, Any]]:
88+
if self._first_page:
89+
self._first_page = False
90+
return self._state
91+
elif self._state == {}:
92+
return None
93+
else:
94+
return self._state
95+
96+
def observe(self, new_state: Mapping[str, Any]) -> None:
97+
self._state = new_state
98+
99+
def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
100+
return self._state or {}
101+
102+
103+
class FullRefreshCheckpointReader(CheckpointReader):
104+
"""
105+
FullRefreshCheckpointReader iterates over data that cannot be checkpointed incrementally during the sync because the stream
106+
is not capable of managing state. At the end of a sync, a final state message is emitted to signal completion.
107+
"""
108+
109+
def __init__(self, stream_slices: Iterable[Optional[Mapping[str, Any]]]):
110+
self._stream_slices = iter(stream_slices)
111+
self._final_checkpoint = False
112+
113+
def next(self) -> Optional[Mapping[str, Any]]:
114+
try:
115+
return next(self._stream_slices)
116+
except StopIteration:
117+
self._final_checkpoint = True
118+
return None
119+
120+
def observe(self, new_state: Mapping[str, Any]) -> None:
121+
pass
122+
123+
def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
124+
if self._final_checkpoint:
125+
return {"__ab_no_cursor_state_message": True}
126+
return None

airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
99
from airbyte_cdk.sources.message import MessageRepository
10-
from airbyte_cdk.sources.streams import FULL_REFRESH_SENTINEL_STATE_KEY
10+
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
1111
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1212
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
1313
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import AbstractStreamStateConverter
@@ -107,7 +107,7 @@ def __init__(
107107

108108
@property
109109
def state(self) -> MutableMapping[str, Any]:
110-
return {FULL_REFRESH_SENTINEL_STATE_KEY: True}
110+
return {NO_CURSOR_STATE_KEY: True}
111111

112112
def observe(self, record: Record) -> None:
113113
pass

0 commit comments

Comments
 (0)