-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFR for API Sources] New Python interfaces to support resumable full refresh #37429
[RFR for API Sources] New Python interfaces to support resumable full refresh #37429
Conversation
… how they fit into Stream.read_records()
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
@@ -45,10 +52,10 @@ def package_name_from_class(cls: object) -> str: | |||
raise ValueError(f"Could not find package name for class {cls}") | |||
|
|||
|
|||
class IncrementalMixin(ABC): | |||
"""Mixin to make stream incremental. | |||
class StateMixin(ABC): # Rename to CheckpointMixin? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this mixin because state setter/getter are no longer specific to Incremental streams. But to retain backwards compatibility I left the old IncrementalMixin
but it just inherits from this mixin
self._state = new_state | ||
|
||
def get_checkpoint(self) -> Optional[Mapping[str, Any]]: | ||
return self._state or {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we call get_checkpoint()
and the end of the last slice/page and at the end of the sync, we end up emitting the same final state twice. We can potentially insert more fields to track state internally within the reader, but i don't think its worth the hassle
def state(self, value: MutableMapping[str, Any]) -> None: | ||
self._state = value | ||
|
||
def read_records( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left the implementation to get the 2 RFR streams working for Hubspot within the new interfaces to illustrate how to uptake the changes in the CDK.
But before merging I'll remove these from the PR
@@ -0,0 +1,77 @@ | |||
# Resumable Full Refresh Streams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@girarda fyi i added a docs for RFR instead of inlining comments for certain methods etc. I still need to proof grammar and clean it up a little but just a heads up since some PR comments called this out
|
||
except AttributeError: | ||
state_manager.update_state_for_stream(self.name, self.namespace, stream_state) | ||
# todo: This can be consolidated into one ConnectorStateManager.update_and_create_state_message() method, but I want |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a follow up issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep created it here https://github.com/airbytehq/airbyte-internal-issues/issues/7500
@abstractmethod | ||
def get_checkpoint(self) -> Optional[Mapping[str, Any]]: | ||
""" | ||
Retrieves the current state value of the stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment explaining that None
means we stop reading?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant this for the checkpointReader.next()
, since None in that case stops parsing, but I'll also comment here that we don't emit state messages if return is None either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great! approved pending reverting the changes to hubspot
ran a few local |
What
Primarily updates how the Stream class performs a read. The big change is around how we resolve what the next partition of records to retrieve is. Because resumable full refresh operates under the paradigm of an unbounded set of pages (unlike incremental partitioned time windows), we need to change how we determine the next slice.
The primary changes to the flow at a high level are:
stream_slices()
being the mechanism for determining the next partition of records to retrieve. I've introduced the concept of aCheckpointReader
whose type is instantiated based on the Stream's implementationHow
Some of the major design changes in the review are:
CheckpointReader
class which is now the main way a stream determines the next partition of values to read. For incremental this continues to be partitions like time windows. For RFR, this is the next page of records. And for RFR this can be parent records for substreams or a single slice{}
.IncrementalMixin
in favor ofStateMixin
since state is used by RFR streams which are not incremental. This is a better name, but I kept the old one for backwards compatibility reasonsStream.stream_slices()
implementation from [None] to [{}]. None is now the indicator to stop iterating over slicessupports_checkpointing
field to streams. It’s needed for two areas. We need to surface this value to the catalog. AND we need this to be overridable because declarative low-code sources delineation for checkpointing differs from python sources.This PR does not implement the work for a substream that requires resumable full refresh. I have a sketched out interface to see if its possible which it does appear to be. But substream state management for RFR becomes quite convoluted due to the issue I'll go into below.
Making the CDK and
Stream
class read directly from the connector managed stateThis is arguably the most controversial DX and design change compared to before RFR. It has some impact on connector developers. In order for RFR to function using the current
read_records()
method, we need some way of communicating state from the the specific connector implementation back to the CDK. To do so, we expect the developer to manageStream.state
. And with RFR we now read the input to decide what to do next. For example:if state:
{ "pageNumber": 23 }
then,
continue syncing.
if state:
{}
then,
stop syncing no more pages.
This is a relatively simple example, but it does illustrate that the developer needs to know to emit
{}
to stop RFR paging. In hubspot updating state is done withself.state = self.next_page_token(response) or {}
. But it feels like not an ideal precedent that a developer needs to have a general awareness of how the checkpoint reader works in order to successfully implement RFR. And this is coupled with theCheckpointReader
being mostly an internal CDK implementation detail that developers shouldn't need to think of.As mentioned earlier, substream RFR streams which would need per-partition cursors requires much more careful reading of the state object for specific structure which is indicative that state as an unstructured map is not the right data type.
This is just for connectors that implement the legacy Python CDK, and since state is managed internally by low-code connectors, I think its fair to also say from the 80:20 rule, we are aiming for a more ideal interface for low-code which makes up a majority of our connectors vs legacy Python.
Alternative:
I did also look into incremental vs. RFR streams having different versions of the
read_records()
method. For incremental/full refresh, they would have the normalread_records() -> iterable
. And for RFRread_records() -> iterable + updated stream state
. This however felt like a step in the wrong direction because we're moving back into two different runtime flows depending on sync type.A structured
state
class:This feels like the more appropriate long term solution. If we have a state interface that handles interpreting state and explicitly communicate what to do with state and what the CDK should emit back to the platform. This is well outside what we can do for the project, but something that I thought about.
Review guide
checkpoint_reader.py
core.py
declarative_stream.py
abstract_source.py
User Impact
noop
Can this PR be safely reverted and rolled back?