Skip to content

Commit 040f141

Browse files
authored
[low-code CDK] Rsumable full refresh support for low-code streams (#38300)
1 parent 29723a7 commit 040f141

36 files changed

+1318
-311
lines changed

airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py

+7
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
from airbyte_cdk.models import SyncMode
99
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1010
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
11+
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
1112
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1213
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
1314
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
15+
from airbyte_cdk.sources.streams.checkpoint import Cursor
1416
from airbyte_cdk.sources.streams.core import Stream
1517
from airbyte_cdk.sources.types import Config, StreamSlice
1618

@@ -157,3 +159,8 @@ def state_checkpoint_interval(self) -> Optional[int]:
157159
important state is the one at the beginning of the slice
158160
"""
159161
return None
162+
163+
def get_cursor(self) -> Optional[Cursor]:
164+
if self.retriever and isinstance(self.retriever, SimpleRetriever):
165+
return self.retriever.cursor
166+
return None

airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@
55
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
66
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
77
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import CursorFactory, PerPartitionCursor
8+
from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ResumableFullRefreshCursor
89

9-
__all__ = ["CursorFactory", "DatetimeBasedCursor", "DeclarativeCursor", "PerPartitionCursor"]
10+
__all__ = ["CursorFactory", "DatetimeBasedCursor", "DeclarativeCursor", "PerPartitionCursor", "ResumableFullRefreshCursor"]

airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py

+5
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,11 @@ def stream_slices(self) -> Iterable[StreamSlice]:
168168
start_datetime = self._calculate_earliest_possible_value(self._select_best_end_datetime())
169169
return self._partition_daterange(start_datetime, end_datetime, self._step)
170170

171+
def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
172+
# Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress
173+
# through each slice and does not belong to a specific slice. We just return stream state as it is.
174+
return self.get_stream_state()
175+
171176
def _calculate_earliest_possible_value(self, end_datetime: datetime.datetime) -> datetime.datetime:
172177
lookback_delta = self._parse_timedelta(self._lookback_window.eval(self.config) if self._lookback_window else "P0D")
173178
earliest_possible_start_datetime = min(self._start_datetime.get_datetime(self.config), end_datetime)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from dataclasses import InitVar, dataclass
4+
from typing import Any, Iterable, Mapping, Optional
5+
6+
from airbyte_cdk.sources.declarative.incremental import DeclarativeCursor
7+
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
8+
9+
10+
@dataclass
11+
class ResumableFullRefreshCursor(DeclarativeCursor):
12+
parameters: InitVar[Mapping[str, Any]]
13+
14+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
15+
self._cursor: StreamState = {}
16+
17+
def get_stream_state(self) -> StreamState:
18+
return self._cursor
19+
20+
def set_initial_state(self, stream_state: StreamState) -> None:
21+
self._cursor = stream_state
22+
23+
def observe(self, stream_slice: StreamSlice, record: Record) -> None:
24+
"""
25+
Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
26+
"""
27+
pass
28+
29+
def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
30+
# The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected
31+
if stream_slice.partition:
32+
raise ValueError(f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}.")
33+
self._cursor = stream_slice.cursor_slice
34+
35+
def should_be_synced(self, record: Record) -> bool:
36+
"""
37+
Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages
38+
that don't have filterable bounds. We should always return them.
39+
"""
40+
return True
41+
42+
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
43+
"""
44+
RFR record don't have ordering to be compared between one another.
45+
"""
46+
return False
47+
48+
def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
49+
# A top-level RFR cursor only manages the state of a single partition
50+
return self._cursor
51+
52+
def stream_slices(self) -> Iterable[StreamSlice]:
53+
"""
54+
Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page
55+
along an unbounded set.
56+
"""
57+
yield from [StreamSlice(cursor_slice=self._cursor, partition={})]
58+
59+
# This is an interesting pattern that might not seem obvious at first glance. This cursor itself has no functional need to
60+
# inject any request values into the outbound response because the up-to-date pagination state is already loaded and
61+
# maintained by the paginator component
62+
def get_request_params(
63+
self,
64+
*,
65+
stream_state: Optional[StreamState] = None,
66+
stream_slice: Optional[StreamSlice] = None,
67+
next_page_token: Optional[Mapping[str, Any]] = None,
68+
) -> Mapping[str, Any]:
69+
return {}
70+
71+
def get_request_headers(
72+
self,
73+
*,
74+
stream_state: Optional[StreamState] = None,
75+
stream_slice: Optional[StreamSlice] = None,
76+
next_page_token: Optional[Mapping[str, Any]] = None,
77+
) -> Mapping[str, Any]:
78+
return {}
79+
80+
def get_request_body_data(
81+
self,
82+
*,
83+
stream_state: Optional[StreamState] = None,
84+
stream_slice: Optional[StreamSlice] = None,
85+
next_page_token: Optional[Mapping[str, Any]] = None,
86+
) -> Mapping[str, Any]:
87+
return {}
88+
89+
def get_request_body_json(
90+
self,
91+
*,
92+
stream_state: Optional[StreamState] = None,
93+
stream_slice: Optional[StreamSlice] = None,
94+
next_page_token: Optional[Mapping[str, Any]] = None,
95+
) -> Mapping[str, Any]:
96+
return {}

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@
2828
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
2929
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
3030
from airbyte_cdk.sources.declarative.extractors.record_selector import SCHEMA_TRANSFORMER_TYPE_MAPPING
31-
from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeBasedCursor, DeclarativeCursor, PerPartitionCursor
31+
from airbyte_cdk.sources.declarative.incremental import (
32+
CursorFactory,
33+
DatetimeBasedCursor,
34+
DeclarativeCursor,
35+
PerPartitionCursor,
36+
ResumableFullRefreshCursor,
37+
)
3238
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
3339
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
3440
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration
@@ -668,6 +674,10 @@ def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) -
668674
)
669675
elif model.incremental_sync:
670676
return self._create_component_from_model(model=model.incremental_sync, config=config) if model.incremental_sync else None
677+
elif hasattr(model.retriever, "paginator") and model.retriever.paginator and not stream_slicer:
678+
# To incrementally deliver RFR for low-code we're first implementing this for streams that do not use
679+
# nested state like substreams or those using list partition routers
680+
return ResumableFullRefreshCursor(parameters={})
671681
elif stream_slicer:
672682
return stream_slicer
673683
else:

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
9999
raise ValueError("page_size_option cannot be set if the pagination strategy does not have a page_size")
100100
if isinstance(self.url_base, str):
101101
self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)
102-
self._token = self.pagination_strategy.initial_token
102+
self._token: Optional[Any] = self.pagination_strategy.initial_token
103103

104104
def next_page_token(
105105
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
@@ -153,8 +153,11 @@ def get_request_body_json(
153153
) -> Mapping[str, Any]:
154154
return self._get_request_options(RequestOptionType.body_json)
155155

156-
def reset(self) -> None:
157-
self.pagination_strategy.reset()
156+
def reset(self, reset_value: Optional[Any] = None) -> None:
157+
if reset_value:
158+
self.pagination_strategy.reset(reset_value=reset_value)
159+
else:
160+
self.pagination_strategy.reset()
158161
self._token = self.pagination_strategy.initial_token
159162

160163
def _get_request_options(self, option_type: RequestOptionType) -> MutableMapping[str, Any]:
@@ -235,6 +238,6 @@ def get_request_body_json(
235238
) -> Mapping[str, Any]:
236239
return self._decorated.get_request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
237240

238-
def reset(self) -> None:
241+
def reset(self, reset_value: Optional[Any] = None) -> None:
239242
self._decorated.reset()
240243
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,6 @@ def get_request_body_json(
6060
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Mapping[str, Any]:
6161
return {}
6262

63-
def reset(self) -> None:
63+
def reset(self, reset_value: Optional[Any] = None) -> None:
6464
# No state to reset
6565
pass

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class Paginator(ABC, RequestOptionsProvider):
2121
"""
2222

2323
@abstractmethod
24-
def reset(self) -> None:
24+
def reset(self, reset_value: Optional[Any] = None) -> None:
2525
"""
2626
Reset the pagination's inner state
2727
"""

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class CursorPaginationStrategy(PaginationStrategy):
3535
decoder: Decoder = JsonDecoder(parameters={})
3636

3737
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
38+
self._initial_cursor = None
3839
if isinstance(self.cursor_value, str):
3940
self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters)
4041
else:
@@ -46,7 +47,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4647

4748
@property
4849
def initial_token(self) -> Optional[Any]:
49-
return None
50+
return self._initial_cursor
5051

5152
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
5253
decoded_response = self.decoder.decode(response)
@@ -74,9 +75,8 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
7475
)
7576
return token if token else None
7677

77-
def reset(self) -> None:
78-
# No state to reset
79-
pass
78+
def reset(self, reset_value: Optional[Any] = None) -> None:
79+
self._initial_cursor = reset_value
8080

8181
def get_page_size(self) -> Optional[int]:
8282
return self.page_size

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,11 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
6666
self._offset += last_page_size
6767
return self._offset
6868

69-
def reset(self) -> None:
70-
self._offset = 0
69+
def reset(self, reset_value: Optional[Any] = 0) -> None:
70+
if not isinstance(reset_value, int):
71+
raise ValueError(f"Reset value {reset_value} for OffsetIncrement pagination strategy was not an integer")
72+
else:
73+
self._offset = reset_value
7174

7275
def get_page_size(self) -> Optional[int]:
7376
if self._page_size:

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,13 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
5151
self._page += 1
5252
return self._page
5353

54-
def reset(self) -> None:
55-
self._page = self.start_from_page
54+
def reset(self, reset_value: Optional[Any] = None) -> None:
55+
if reset_value is None:
56+
self._page = self.start_from_page
57+
elif not isinstance(reset_value, int):
58+
raise ValueError(f"Reset value {reset_value} for PageIncrement pagination strategy was not an integer")
59+
else:
60+
self._page = reset_value
5661

5762
def get_page_size(self) -> Optional[int]:
5863
return self._page_size

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
3434
pass
3535

3636
@abstractmethod
37-
def reset(self) -> None:
37+
def reset(self, reset_value: Optional[Any] = None) -> None:
3838
"""
3939
Reset the pagination's inner state
4040
"""

0 commit comments

Comments
 (0)