-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[RFR for API Sources] New Python interfaces to support resumable full refresh #37429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
45be2a9
3aacffe
67582c2
f0af212
2d69903
5484d39
45ce52d
5f488ab
2169498
6c2d596
f60a0b7
1b18edb
514fb3f
a39bcc6
1e54edc
cc76250
12f1789
28564e4
94bc5be
8ddae37
52b1a5e
e0e88fb
e976383
0af07eb
8bda880
87fd05e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
|
||
|
||
from .checkpoint_reader import CheckpointMode, CheckpointReader, FullRefreshCheckpointReader, IncrementalCheckpointReader, ResumableFullRefreshCheckpointReader | ||
|
||
__all__ = ["CheckpointMode", "CheckpointReader", "FullRefreshCheckpointReader", "IncrementalCheckpointReader", "ResumableFullRefreshCheckpointReader"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
|
||
from abc import ABC, abstractmethod | ||
from enum import Enum | ||
from typing import Any, Iterable, Mapping, Optional | ||
|
||
|
||
class CheckpointMode(Enum): | ||
INCREMENTAL = "incremental" | ||
RESUMABLE_FULL_REFRESH = "resumable_full_refresh" | ||
FULL_REFRESH = "full_refresh" | ||
|
||
|
||
class CheckpointReader(ABC): | ||
""" | ||
CheckpointReader manages how to iterate over a stream's partitions and serves as the bridge for interpreting the current state | ||
of the stream that should be emitted back to the platform. | ||
""" | ||
|
||
@abstractmethod | ||
def next(self) -> Optional[Mapping[str, Any]]: | ||
""" | ||
Returns the next slice that will be used to fetch the next group of records | ||
""" | ||
|
||
@abstractmethod | ||
def observe(self, new_state: Mapping[str, Any]) -> None: | ||
""" | ||
Updates the internal state of the checkpoint reader based on the incoming stream state from a connector. | ||
|
||
WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. | ||
girarda marked this conversation as resolved.
Show resolved
Hide resolved
|
||
In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods. | ||
""" | ||
# todo blai: Ideally observe and get_checkpoint should just be one method, but because of the legacy state behavior | ||
# observation and reading Stream.state checkpoint are not 1:1 with each other | ||
|
||
@abstractmethod | ||
def get_checkpoint(self) -> Optional[Mapping[str, Any]]: | ||
girarda marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Retrieves the current state value of the stream | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add a comment explaining that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you meant this for the |
||
""" | ||
|
||
|
||
class IncrementalCheckpointReader(CheckpointReader): | ||
""" | ||
IncrementalCheckpointReader handles iterating through a stream based on partitioned windows of data that are determined | ||
before syncing data. | ||
""" | ||
|
||
def __init__(self, stream_state: Mapping[str, Any], stream_slices: Iterable[Optional[Mapping[str, Any]]]): | ||
self._state: Optional[Mapping[str, Any]] = stream_state | ||
self._stream_slices = iter(stream_slices) | ||
self._has_slices = False | ||
|
||
def next(self) -> Optional[Mapping[str, Any]]: | ||
try: | ||
next_slice = next(self._stream_slices) | ||
self._has_slices = True | ||
return next_slice | ||
except StopIteration: | ||
# This is used to avoid sending a duplicate state message at the end of a sync since the stream has already | ||
# emitted state at the end of each slice. If we want to avoid this extra complexity, we can also just accept | ||
# that every sync emits a final duplicate state | ||
if self._has_slices: | ||
self._state = None | ||
return None | ||
|
||
def observe(self, new_state: Mapping[str, Any]) -> None: | ||
self._state = new_state | ||
|
||
def get_checkpoint(self) -> Optional[Mapping[str, Any]]: | ||
return self._state | ||
|
||
|
||
class ResumableFullRefreshCheckpointReader(CheckpointReader): | ||
""" | ||
ResumableFullRefreshCheckpointReader allows for iteration over an unbounded set of records based on the pagination strategy | ||
of the stream. Because the number of pages is unknown, the stream's current state is used to determine whether to continue | ||
fetching more pages or stopping the sync. | ||
""" | ||
|
||
def __init__(self, stream_state: Mapping[str, Any]): | ||
# The first attempt of an RFR stream has an empty {} incoming state, but should still make a first attempt to read records | ||
# from the first page in next(). | ||
self._first_page = bool(stream_state == {}) | ||
self._state: Mapping[str, Any] = stream_state | ||
|
||
def next(self) -> Optional[Mapping[str, Any]]: | ||
# todo blai: I think this is my main concern with the interface is that it puts a lot of onus on the connector developer to | ||
# structure their state object correctly to coincide with how the checkpoint_reader interprets values. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've talked about this aspect at length with a few other engineers. I really don't know a better way forward in the immediate right now. Having it be non-obvious that the connector developer be state aware is not a good DX. The longer term solution would either be a structured state class that handles how to read/write state using clearly defined method instead of a generic map. Or through a cursor class. The problem is it doesn't current exist in the legacy Python CDK However, we can potentially rationalize that this is okay in the immediate for two reasons:
Basically, even if this interface is not a good paradigm shift in the short term, if we're focusing on concurrent + low-code long term, we can accept this. Happy to be challenged on this as well There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you document the expectations on shape of the state object? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes good point yep I'll find a place to document this either in code or in airbyte docs |
||
if self._first_page: | ||
self._first_page = False | ||
return self._state | ||
elif self._state == {}: | ||
return None | ||
else: | ||
return self._state | ||
|
||
def observe(self, new_state: Mapping[str, Any]) -> None: | ||
self._state = new_state | ||
|
||
def get_checkpoint(self) -> Optional[Mapping[str, Any]]: | ||
return self._state or {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we call |
||
|
||
|
||
class FullRefreshCheckpointReader(CheckpointReader): | ||
""" | ||
FullRefreshCheckpointReader iterates over data that cannot be checkpointed incrementally during the sync because the stream | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you have an example of a stream that cannot support RFR? Let's document why / when a connector developer should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since these checkpoint readers aren't intended to be configured by the developer, I think it would make more sense to document this where I am going to document the state object shape mentioned above. It'll be in a separate airbyte docs page |
||
is not capable of managing state. At the end of a sync, a final state message is emitted to signal completion. | ||
""" | ||
|
||
def __init__(self, stream_slices: Iterable[Optional[Mapping[str, Any]]]): | ||
self._stream_slices = iter(stream_slices) | ||
self._final_checkpoint = False | ||
|
||
def next(self) -> Optional[Mapping[str, Any]]: | ||
try: | ||
return next(self._stream_slices) | ||
except StopIteration: | ||
self._final_checkpoint = True | ||
return None | ||
|
||
def observe(self, new_state: Mapping[str, Any]) -> None: | ||
pass | ||
|
||
def get_checkpoint(self) -> Optional[Mapping[str, Any]]: | ||
if self._final_checkpoint: | ||
return {"__ab_no_cursor_state_message": True} | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally changed this so we are no longer checking if
stream_state
is truthy. I'm not sure if it was a prior bug or not, but a problem i noticed in testing was that as a side effect of runningcheck_availability()
. Because check_availability runs before a read, for stream's implementing state getter/setter, the stream state after check_availability might get updated and the sync can accidentally use the wrong state.For incremental streams running the first time where incoming state value was
{}
we might miss data if check availability set the state. This change makes it so we always use exactly the state that was passed in as input at the start of the sync whether it be{}
or{"updated_at": "2024-04-24"}
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yikes. good catch