Skip to content

Commit 2fa35ab

Browse files
authored
feat(airbyte-cdk): Add Global Parent State Cursor (#39593)
1 parent 04f40f9 commit 2fa35ab

File tree

11 files changed

+903
-12
lines changed

11 files changed

+903
-12
lines changed

airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,11 @@ definitions:
815815
description: Set to True if the target API does not accept queries where the start time equal the end time.
816816
type: boolean
817817
default: False
818+
global_substream_cursor:
819+
title: Whether to store cursor as one value instead of per partition
820+
description: This setting optimizes performance when the parent stream has thousands of partitions by storing the cursor as a single value rather than per partition. Notably, the substream state is updated only at the end of the sync, which helps prevent data loss in case of a sync failure. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/incremental-syncs).
821+
type: boolean
822+
default: false
818823
lookback_window:
819824
title: Lookback Window
820825
description: Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.

airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,16 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
4848
"""
4949

5050
def __init__(
51-
self, date_time_based_cursor: DatetimeBasedCursor, per_partition_cursor: Optional[PerPartitionCursor] = None, **kwargs: Any
51+
self,
52+
date_time_based_cursor: DatetimeBasedCursor,
53+
per_partition_cursor: Optional[PerPartitionCursor] = None,
54+
is_global_substream_cursor: bool = False,
55+
**kwargs: Any,
5256
):
5357
super().__init__(**kwargs)
5458
self._date_time_based_cursor = date_time_based_cursor
5559
self._per_partition_cursor = per_partition_cursor
60+
self.is_global_substream_cursor = is_global_substream_cursor
5661

5762
@property
5863
def _cursor_field(self) -> str:
@@ -102,6 +107,10 @@ def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice)
102107
# self._per_partition_cursor is the same object that DeclarativeStream uses to save/update stream_state
103108
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice)
104109
return partition_state.get(self._cursor_field) if partition_state else None
110+
111+
if self.is_global_substream_cursor:
112+
return stream_state.get("state", {}).get(self._cursor_field) # type: ignore # state is inside a dict for GlobalSubstreamCursor
113+
105114
return stream_state.get(self._cursor_field)
106115

107116
def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime:

airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/__init__.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,16 @@
44

55
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
66
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
7+
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import GlobalSubstreamCursor
78
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import CursorFactory, PerPartitionCursor
89
from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ResumableFullRefreshCursor, ChildPartitionResumableFullRefreshCursor
910

10-
__all__ = ["CursorFactory", "DatetimeBasedCursor", "DeclarativeCursor", "PerPartitionCursor", "ResumableFullRefreshCursor", "ChildPartitionResumableFullRefreshCursor"]
11+
__all__ = [
12+
"CursorFactory",
13+
"DatetimeBasedCursor",
14+
"DeclarativeCursor",
15+
"GlobalSubstreamCursor",
16+
"PerPartitionCursor",
17+
"ResumableFullRefreshCursor",
18+
"ChildPartitionResumableFullRefreshCursor"
19+
]

airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import datetime
66
from dataclasses import InitVar, dataclass, field
7+
from datetime import timedelta
78
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Union
89

910
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type
@@ -15,7 +16,7 @@
1516
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
1617
from airbyte_cdk.sources.message import MessageRepository
1718
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
18-
from isodate import Duration, parse_duration
19+
from isodate import Duration, duration_isoformat, parse_duration
1920

2021

2122
@dataclass
@@ -363,3 +364,17 @@ def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
363364
return True
364365
else:
365366
return False
367+
368+
def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
369+
"""
370+
Updates the lookback window based on a given number of seconds if the new duration
371+
is greater than the currently configured lookback window.
372+
373+
:param lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
374+
"""
375+
runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds))
376+
config_lookback = parse_duration(self._lookback_window.eval(self.config) if self._lookback_window else "P0D")
377+
378+
# Check if the new runtime lookback window is greater than the current config lookback
379+
if parse_duration(runtime_lookback_window) > config_lookback:
380+
self._lookback_window = InterpolatedString.create(runtime_lookback_window, parameters={})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import threading
6+
import time
7+
from typing import Any, Iterable, Mapping, Optional, Union
8+
9+
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
10+
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
11+
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
12+
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState
13+
14+
15+
class Timer:
16+
"""
17+
A simple timer class that measures elapsed time in seconds using a high-resolution performance counter.
18+
"""
19+
20+
def __init__(self) -> None:
21+
self._start: Optional[int] = None
22+
23+
def start(self) -> None:
24+
self._start = time.perf_counter_ns()
25+
26+
def finish(self) -> int:
27+
if self._start:
28+
return int((time.perf_counter_ns() - self._start) // 1e9)
29+
else:
30+
raise RuntimeError("Global substream cursor timer not started")
31+
32+
33+
class GlobalSubstreamCursor(DeclarativeCursor):
34+
"""
35+
The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor.
36+
This class is beneficial for streams with many partitions, as it allows the state to be managed globally
37+
instead of per partition, simplifying state management and reducing the size of state messages.
38+
39+
This cursor is activated by setting the `global_substream_cursor` parameter for incremental sync.
40+
41+
Warnings:
42+
- This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs.
43+
- The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated.
44+
- When using the `incremental_dependency` option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both the `global_substream_cursor` and `incremental_dependency` options to avoid data loss.
45+
"""
46+
47+
def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter):
48+
self._stream_cursor = stream_cursor
49+
self._partition_router = partition_router
50+
self._timer = Timer()
51+
self._lock = threading.Lock()
52+
self._slice_semaphore = threading.Semaphore(0) # Start with 0, indicating no slices being tracked
53+
self._all_slices_yielded = False
54+
self._lookback_window: Optional[int] = None
55+
56+
def stream_slices(self) -> Iterable[StreamSlice]:
57+
"""
58+
Generates stream slices, ensuring the last slice is properly flagged and processed.
59+
60+
This method creates a sequence of stream slices by iterating over partitions and cursor slices.
61+
It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the
62+
final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice`
63+
is called only after all slices have been processed.
64+
65+
We expect the following events:
66+
* Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False`
67+
* Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed
68+
* Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed
69+
* Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
70+
"""
71+
previous_slice = None
72+
73+
slice_generator = (
74+
StreamSlice(partition=partition, cursor_slice=cursor_slice)
75+
for partition in self._partition_router.stream_slices()
76+
for cursor_slice in self._stream_cursor.stream_slices()
77+
)
78+
self._timer.start()
79+
80+
for slice in slice_generator:
81+
if previous_slice is not None:
82+
# Release the semaphore to indicate that a slice has been yielded
83+
self._slice_semaphore.release()
84+
yield previous_slice
85+
86+
# Store the current slice as the previous slice for the next iteration
87+
previous_slice = slice
88+
89+
# After all slices have been generated, release the semaphore one final time
90+
# and flag that all slices have been yielded
91+
self._slice_semaphore.release()
92+
self._all_slices_yielded = True
93+
94+
# Yield the last slice
95+
if previous_slice is not None:
96+
yield previous_slice
97+
98+
def set_initial_state(self, stream_state: StreamState) -> None:
99+
"""
100+
Set the initial state for the cursors.
101+
102+
This method initializes the state for the global cursor using the provided stream state.
103+
104+
Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
105+
does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
106+
107+
Args:
108+
stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
109+
{
110+
"state": {
111+
"last_updated": "2023-05-27T00:00:00Z"
112+
},
113+
"parent_state": {
114+
"parent_stream_name": {
115+
"last_updated": "2023-05-27T00:00:00Z"
116+
}
117+
},
118+
"lookback_window": 132
119+
}
120+
"""
121+
if not stream_state:
122+
return
123+
124+
if "lookback_window" in stream_state:
125+
self._lookback_window = stream_state["lookback_window"]
126+
self._inject_lookback_into_stream_cursor(stream_state["lookback_window"])
127+
128+
self._stream_cursor.set_initial_state(stream_state["state"])
129+
130+
# Set parent state for partition routers based on parent streams
131+
self._partition_router.set_initial_state(stream_state)
132+
133+
def _inject_lookback_into_stream_cursor(self, lookback_window: int) -> None:
134+
"""
135+
Modifies the stream cursor's lookback window based on the duration of the previous sync.
136+
This adjustment ensures the cursor is set to the minimal lookback window necessary for
137+
avoiding missing data.
138+
139+
Parameters:
140+
lookback_window (int): The lookback duration in seconds to be set, derived from
141+
the previous sync.
142+
143+
Raises:
144+
ValueError: If the cursor does not support dynamic lookback window adjustments.
145+
"""
146+
if hasattr(self._stream_cursor, "set_runtime_lookback_window"):
147+
self._stream_cursor.set_runtime_lookback_window(lookback_window)
148+
else:
149+
raise ValueError("The cursor class for Global Substream Cursor does not have a set_runtime_lookback_window method")
150+
151+
def observe(self, stream_slice: StreamSlice, record: Record) -> None:
152+
self._stream_cursor.observe(StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record)
153+
154+
def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
155+
"""
156+
Close the current stream slice.
157+
158+
This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor
159+
only after reading all slices. This ensures that we do not miss any child records from a later parent record
160+
if the child cursor is earlier than a record from the first parent record.
161+
162+
Args:
163+
stream_slice (StreamSlice): The stream slice to be closed.
164+
*args (Any): Additional arguments.
165+
"""
166+
with self._lock:
167+
self._slice_semaphore.acquire()
168+
if self._all_slices_yielded and self._slice_semaphore._value == 0:
169+
self._lookback_window = self._timer.finish()
170+
self._stream_cursor.close_slice(StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args)
171+
172+
def get_stream_state(self) -> StreamState:
173+
state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()}
174+
175+
parent_state = self._partition_router.get_stream_state()
176+
if parent_state:
177+
state["parent_state"] = parent_state
178+
179+
if self._lookback_window is not None:
180+
state["lookback_window"] = self._lookback_window
181+
182+
return state
183+
184+
def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
185+
# stream_slice is ignored as cursor is global
186+
return self._stream_cursor.get_stream_state()
187+
188+
def get_request_params(
189+
self,
190+
*,
191+
stream_state: Optional[StreamState] = None,
192+
stream_slice: Optional[StreamSlice] = None,
193+
next_page_token: Optional[Mapping[str, Any]] = None,
194+
) -> Mapping[str, Any]:
195+
if stream_slice:
196+
return self._partition_router.get_request_params( # type: ignore # this always returns a mapping
197+
stream_state=stream_state,
198+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
199+
next_page_token=next_page_token,
200+
) | self._stream_cursor.get_request_params(
201+
stream_state=stream_state,
202+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
203+
next_page_token=next_page_token,
204+
)
205+
else:
206+
raise ValueError("A partition needs to be provided in order to get request params")
207+
208+
def get_request_headers(
209+
self,
210+
*,
211+
stream_state: Optional[StreamState] = None,
212+
stream_slice: Optional[StreamSlice] = None,
213+
next_page_token: Optional[Mapping[str, Any]] = None,
214+
) -> Mapping[str, Any]:
215+
if stream_slice:
216+
return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping
217+
stream_state=stream_state,
218+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
219+
next_page_token=next_page_token,
220+
) | self._stream_cursor.get_request_headers(
221+
stream_state=stream_state,
222+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
223+
next_page_token=next_page_token,
224+
)
225+
else:
226+
raise ValueError("A partition needs to be provided in order to get request headers")
227+
228+
def get_request_body_data(
229+
self,
230+
*,
231+
stream_state: Optional[StreamState] = None,
232+
stream_slice: Optional[StreamSlice] = None,
233+
next_page_token: Optional[Mapping[str, Any]] = None,
234+
) -> Union[Mapping[str, Any], str]:
235+
if stream_slice:
236+
return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping
237+
stream_state=stream_state,
238+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
239+
next_page_token=next_page_token,
240+
) | self._stream_cursor.get_request_body_data(
241+
stream_state=stream_state,
242+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
243+
next_page_token=next_page_token,
244+
)
245+
else:
246+
raise ValueError("A partition needs to be provided in order to get request body data")
247+
248+
def get_request_body_json(
249+
self,
250+
*,
251+
stream_state: Optional[StreamState] = None,
252+
stream_slice: Optional[StreamSlice] = None,
253+
next_page_token: Optional[Mapping[str, Any]] = None,
254+
) -> Mapping[str, Any]:
255+
if stream_slice:
256+
return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping
257+
stream_state=stream_state,
258+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
259+
next_page_token=next_page_token,
260+
) | self._stream_cursor.get_request_body_json(
261+
stream_state=stream_state,
262+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
263+
next_page_token=next_page_token,
264+
)
265+
else:
266+
raise ValueError("A partition needs to be provided in order to get request body json")
267+
268+
def should_be_synced(self, record: Record) -> bool:
269+
return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record))
270+
271+
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
272+
return self._stream_cursor.is_greater_than_or_equal(
273+
self._convert_record_to_cursor_record(first), self._convert_record_to_cursor_record(second)
274+
)
275+
276+
@staticmethod
277+
def _convert_record_to_cursor_record(record: Record) -> Record:
278+
return Record(
279+
record.data,
280+
StreamSlice(partition={}, cursor_slice=record.associated_slice.cursor_slice) if record.associated_slice else None,
281+
)

airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+5
Original file line numberDiff line numberDiff line change
@@ -1107,6 +1107,11 @@ class DatetimeBasedCursor(BaseModel):
11071107
description='Set to True if the target API does not accept queries where the start time equal the end time.',
11081108
title='Whether to skip requests if the start time equals the end time',
11091109
)
1110+
global_substream_cursor: Optional[bool] = Field(
1111+
False,
1112+
description='This setting optimizes performance when the parent stream has thousands of partitions by storing the cursor as a single value rather than per partition. Notably, the substream state is updated only at the end of the sync, which helps prevent data loss in case of a sync failure. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/incremental-syncs).',
1113+
title='Whether to store cursor as one value instead of per partition',
1114+
)
11101115
lookback_window: Optional[str] = Field(
11111116
None,
11121117
description='Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.',

0 commit comments

Comments
 (0)