-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[RFR for API Sources] New Python interfaces to support resumable full refresh #37429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
45be2a9
3aacffe
67582c2
f0af212
2d69903
5484d39
45ce52d
5f488ab
2169498
6c2d596
f60a0b7
1b18edb
514fb3f
a39bcc6
1e54edc
cc76250
12f1789
28564e4
94bc5be
8ddae37
52b1a5e
e0e88fb
e976383
0af07eb
8bda880
87fd05e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
|
||
|
||
from .checkpoint_reader import CheckpointMode, CheckpointReader, FullRefreshCheckpointReader, IncrementalCheckpointReader, ResumableFullRefreshCheckpointReader | ||
|
||
__all__ = ["CheckpointMode", "CheckpointReader", "FullRefreshCheckpointReader", "IncrementalCheckpointReader", "ResumableFullRefreshCheckpointReader"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
|
||
from abc import ABC, abstractmethod | ||
from enum import Enum | ||
from typing import Any, Iterable, Mapping, MutableMapping, Optional | ||
|
||
|
||
class CheckpointMode(Enum): | ||
INCREMENTAL = "incremental" | ||
RESUMABLE_FULL_REFRESH = "resumable_full_refresh" | ||
FULL_REFRESH = "full_refresh" | ||
|
||
|
||
class CheckpointReader(ABC): | ||
@abstractmethod | ||
def next(self) -> Optional[MutableMapping[str, Any]]: | ||
""" | ||
Returns the next slice to process | ||
""" | ||
|
||
@abstractmethod | ||
def observe(self, new_state: Mapping[str, Any]) -> MutableMapping[str, Any]: | ||
""" | ||
Updates the internal state of the checkpoint reader based on the incoming stream state from a connector. | ||
|
||
WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. | ||
In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods. | ||
""" | ||
|
||
@abstractmethod | ||
def read_state(self) -> MutableMapping[str, Any]: | ||
""" | ||
This is interesting. With this move, we've turned checkpoint reader to resemble even more of a cursor because we are acting | ||
even more like an intermediary since we are more regularly assigning Stream.state to CheckpointReader._state via observe | ||
""" | ||
|
||
|
||
class IncrementalCheckpointReader(CheckpointReader): | ||
def __init__(self, stream_slices: Iterable[Optional[Mapping[str, Any]]]): | ||
self._state = None | ||
self._stream_slices = iter(stream_slices) | ||
|
||
def next(self) -> Optional[MutableMapping[str, Any]]: | ||
try: | ||
return next(self._stream_slices) | ||
except StopIteration: | ||
return None | ||
|
||
def observe(self, new_state: Mapping[str, Any]): | ||
# This is really only needed for backward compatibility with the legacy state management implementations. | ||
# We only update the underlying _state value for legacy, otherwise managing state is done by the connector implementation | ||
self._state = new_state | ||
|
||
def read_state(self) -> MutableMapping[str, Any]: | ||
return self._state | ||
|
||
|
||
class ResumableFullRefreshCheckpointReader(CheckpointReader): | ||
def __init__(self, stream_state: MutableMapping[str, Any]): | ||
self._state: Optional[MutableMapping[str, Any]] = stream_state | ||
# can i have a dummy for first iteration to trigger the loop, and subsequent ones, we see {} and then therefor end the loop | ||
|
||
def next(self) -> Optional[MutableMapping[str, Any]]: | ||
return self._state | ||
|
||
def observe(self, new_state: Mapping[str, Any]): | ||
# observe() was originally just for backwards compatibility, but we can potentially fold it more into the the read_records() | ||
# flow as I've coded out so far. | ||
self._state = new_state | ||
|
||
def read_state(self) -> MutableMapping[str, Any]: | ||
return self._state or {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we call |
||
|
||
|
||
class FullRefreshCheckpointReader(CheckpointReader): | ||
def __init__(self): | ||
self._stream_slices = iter([{}]) | ||
|
||
def next(self) -> Optional[MutableMapping[str, Any]]: | ||
try: | ||
return next(self._stream_slices) | ||
except StopIteration: | ||
return None | ||
|
||
def observe(self, new_state: Mapping[str, Any]): | ||
pass | ||
|
||
def read_state(self) -> MutableMapping[str, Any]: | ||
return {"__ab_is_sync_complete": True} # replace this with the new terminal value from ella |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,13 @@ | |
import airbyte_cdk.sources.utils.casing as casing | ||
from airbyte_cdk.models import AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, SyncMode | ||
from airbyte_cdk.models import Type as MessageType | ||
from airbyte_cdk.sources.streams.checkpoint import ( | ||
CheckpointMode, | ||
CheckpointReader, | ||
FullRefreshCheckpointReader, | ||
IncrementalCheckpointReader, | ||
ResumableFullRefreshCheckpointReader, | ||
) | ||
|
||
# list of all possible HTTP methods which can be used for sending of request bodies | ||
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, ResourceSchemaLoader | ||
|
@@ -45,10 +52,10 @@ def package_name_from_class(cls: object) -> str: | |
raise ValueError(f"Could not find package name for class {cls}") | ||
|
||
|
||
class IncrementalMixin(ABC): | ||
"""Mixin to make stream incremental. | ||
class StateMixin(ABC): # Rename to CheckpointMixin? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I renamed this mixin because state setter/getter are no longer specific to Incremental streams. But to retain backwards compatibility I left the old |
||
"""Mixin for a stream that implements reading and writing the internal state used to checkpoint sync progress to the platform | ||
|
||
class IncrementalStream(Stream, IncrementalMixin): | ||
class CheckpointedStream(Stream, StateMixin): | ||
@property | ||
def state(self): | ||
return self._state | ||
|
@@ -79,6 +86,21 @@ def state(self, value: MutableMapping[str, Any]) -> None: | |
"""State setter, accept state serialized by state getter.""" | ||
|
||
|
||
@deprecated(version="0.x.x", reason="Deprecated in favor of the StateMixin which offers similar functionality") | ||
class IncrementalMixin(StateMixin, ABC): | ||
"""Mixin to make stream incremental. | ||
|
||
class IncrementalStream(Stream, IncrementalMixin): | ||
@property | ||
def state(self): | ||
return self._state | ||
|
||
@state.setter | ||
def state(self, value): | ||
self._state[self.cursor_field] = value[self.cursor_field] | ||
""" | ||
|
||
|
||
class Stream(ABC): | ||
""" | ||
Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol. | ||
|
@@ -123,22 +145,36 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o | |
sync_mode = configured_stream.sync_mode | ||
cursor_field = configured_stream.cursor_field | ||
|
||
slices = self.stream_slices( | ||
cursor_field=cursor_field, | ||
sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior | ||
stream_state=stream_state, | ||
) | ||
logger.debug(f"Processing stream slices for {self.name} (sync_mode: {sync_mode.name})", extra={"stream_slices": slices}) | ||
checkpoint_mode = self._checkpoint_mode | ||
if checkpoint_mode == CheckpointMode.INCREMENTAL: | ||
slices = self.stream_slices( | ||
cursor_field=cursor_field, | ||
sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior | ||
stream_state=stream_state, | ||
) | ||
# yikes more backwards compatibility cruft. Because of poor foresight, we wrote the default stream_slices() method to return | ||
# [None], which is not only confusing, but we 've now normalized this behavior for connector developers. Now we have connectors | ||
# that also return [None]. This is objectively misleading and the ideal interface is return [{}] to indicate we still want | ||
# to iterate a first slice, but w/ no specific slice values. None is bad, and now I feel bad that I have to write this hack | ||
if slices == [None]: | ||
slices = [{}] | ||
logger.debug(f"Processing stream slices for {self.name} (sync_mode: {sync_mode.name})", extra={"stream_slices": slices}) | ||
checkpoint_reader = IncrementalCheckpointReader(stream_slices=slices) | ||
elif checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH: | ||
checkpoint_reader = ResumableFullRefreshCheckpointReader(stream_state=stream_state) | ||
else: | ||
checkpoint_reader = FullRefreshCheckpointReader() | ||
|
||
has_slices = False | ||
next_slice = checkpoint_reader.next() | ||
record_counter = 0 | ||
for _slice in slices: | ||
while next_slice is not None: | ||
has_slices = True | ||
if slice_logger.should_log_slice_message(logger): | ||
yield slice_logger.create_slice_log_message(_slice) | ||
yield slice_logger.create_slice_log_message(next_slice) | ||
records = self.read_records( | ||
sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior | ||
stream_slice=_slice, | ||
stream_slice=next_slice, | ||
stream_state=stream_state, | ||
cursor_field=cursor_field or None, | ||
) | ||
|
@@ -148,37 +184,40 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o | |
hasattr(record_data_or_message, "type") and record_data_or_message.type == MessageType.RECORD | ||
): | ||
record_data = record_data_or_message if isinstance(record_data_or_message, Mapping) else record_data_or_message.record | ||
|
||
# BL: Thanks I hate it. RFR fundamentally doesn't fit with the concept of the legacy Stream.get_updated_state() | ||
# method because RFR streams rely on pagination as a cursor and get_updated_state() was designed to have | ||
# the CDK manage state using specifically the last seen record. | ||
|
||
# todo QUESTION: Altho this only affects the incremental legacy case, and we still have cursor protection, | ||
# if we split observe into observe_slice vs observe_record then we could get rid of this condition entirely assuming | ||
# the bug below is fixed | ||
if self.cursor_field: | ||
# Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This | ||
# should be fixed on the stream implementation, but we should also protect against this in the CDK as well | ||
stream_state = self.get_updated_state(stream_state, record_data) | ||
self._observe_state_wrapper(checkpoint_reader, self.get_updated_state(stream_state, record_data)) | ||
record_counter += 1 | ||
|
||
if sync_mode == SyncMode.incremental: | ||
# Checkpoint intervals are a bit controversial, but see below comment about why we're gating it right now | ||
checkpoint_interval = self.state_checkpoint_interval | ||
if checkpoint_interval and record_counter % checkpoint_interval == 0: | ||
airbyte_state_message = self._checkpoint_state(stream_state, state_manager) | ||
airbyte_state_message = self._new_checkpoint_state( | ||
checkpoint_reader=checkpoint_reader, state_manager=state_manager | ||
) | ||
yield airbyte_state_message | ||
|
||
if internal_config.is_limit_reached(record_counter): | ||
break | ||
self._observe_state_wrapper(checkpoint_reader) | ||
airbyte_state_message = self._new_checkpoint_state(checkpoint_reader=checkpoint_reader, state_manager=state_manager) | ||
# airbyte_state_message = self._checkpoint_state(stream_state, state_manager) | ||
yield airbyte_state_message | ||
|
||
next_slice = checkpoint_reader.next() | ||
|
||
if sync_mode == SyncMode.incremental: | ||
# Even though right now, only incremental streams running as incremental mode will emit periodic checkpoints. Rather than | ||
# overhaul how refresh interacts with the platform, this positions the code so that once we want to start emitting | ||
# periodic checkpoints in full refresh mode it can be done here | ||
airbyte_state_message = self._checkpoint_state(stream_state, state_manager) | ||
yield airbyte_state_message | ||
|
||
if not has_slices or sync_mode == SyncMode.full_refresh: | ||
if sync_mode == SyncMode.full_refresh: | ||
# We use a dummy state if there is no suitable value provided by full_refresh streams that do not have a valid cursor. | ||
# Incremental streams running full_refresh mode emit a meaningful state | ||
stream_state = stream_state or {FULL_REFRESH_SENTINEL_STATE_KEY: True} | ||
|
||
# We should always emit a final state message for full refresh sync or streams that do not have any slices | ||
airbyte_state_message = self._checkpoint_state(stream_state, state_manager) | ||
if not has_slices: | ||
airbyte_state_message = self._new_checkpoint_state(checkpoint_reader=checkpoint_reader, state_manager=state_manager) | ||
yield airbyte_state_message | ||
|
||
@abstractmethod | ||
|
@@ -210,7 +249,8 @@ def as_airbyte_stream(self) -> AirbyteStream: | |
if self.namespace: | ||
stream.namespace = self.namespace | ||
|
||
if self.supports_incremental: | ||
# If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value | ||
if not self.supports_resumable_full_refresh and self.supports_incremental: | ||
stream.source_defined_cursor = self.source_defined_cursor | ||
stream.supported_sync_modes.append(SyncMode.incremental) # type: ignore | ||
stream.default_cursor_field = self._wrapped_cursor_field() | ||
|
@@ -295,7 +335,7 @@ def stream_slices( | |
:param stream_state: | ||
:return: | ||
""" | ||
return [None] | ||
return [{}] # I really hate that this used to be None, which was just cruft to get the first loop in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. Do you want to keep the comment in code for posterity or can we remove it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removing as I think its understandable that {} == an empty single slice |
||
|
||
@property | ||
def state_checkpoint_interval(self) -> Optional[int]: | ||
|
@@ -328,6 +368,24 @@ def get_updated_state( | |
""" | ||
return {} | ||
|
||
@property | ||
def _supports_state(self) -> bool: | ||
if hasattr(type(self), "state") and getattr(type(self), "state").fset is not None: | ||
# Modern case where a stream manages state using getter/setter | ||
return True | ||
else: | ||
# Legacy case where the CDK manages state via the get_updated_state method | ||
return type(self).get_updated_state != Stream.get_updated_state | ||
|
||
@property | ||
def _checkpoint_mode(self) -> CheckpointMode: | ||
if self._supports_state and len(self._wrapped_cursor_field()) > 0: | ||
return CheckpointMode.INCREMENTAL | ||
elif self._supports_state: | ||
return CheckpointMode.RESUMABLE_FULL_REFRESH | ||
else: | ||
return CheckpointMode.FULL_REFRESH | ||
|
||
def log_stream_sync_configuration(self) -> None: | ||
""" | ||
Logs the configuration of this stream. | ||
|
@@ -379,3 +437,34 @@ def _checkpoint_state( # type: ignore # ignoring typing for ConnectorStateMana | |
except AttributeError: | ||
state_manager.update_state_for_stream(self.name, self.namespace, stream_state) | ||
return state_manager.create_state_message(self.name, self.namespace) | ||
|
||
# Below is the one part of the interface that i'm a little iffy about because checkpoint_reader and the connector state manager | ||
# feel like they do very similar things. If they were combined it would be cool, but not something that is show stopping | ||
|
||
def _observe_state_wrapper( | ||
self, | ||
checkpoint_reader: CheckpointReader, | ||
stream_state: Optional[Mapping[str, Any]] = None, | ||
): | ||
# todo: BL some of this makes me feel like the checkpoint_reader feels eerily similar to our existing | ||
# connector state manager. I wonder if its realistic to combine these two concepts into one? | ||
|
||
# Convenience method that attempts to read the Stream's state using the recommended way of connector's managing their | ||
# own state via state setter/getter. But if we get back an AttributeError, then the legacy Stream.get_updated_state() | ||
# method is used as a fallback method. | ||
try: | ||
new_state = self.state # type: ignore # we know the field might not exist... | ||
except AttributeError: | ||
new_state = stream_state | ||
checkpoint_reader.observe(new_state) | ||
|
||
def _new_checkpoint_state( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies | ||
self, | ||
checkpoint_reader: CheckpointReader, | ||
state_manager, | ||
) -> AirbyteMessage: | ||
# todo: BL some of this makes me feel like the checkpoint_reader feels eerily similar to our existing | ||
# connector state manager. I wonder if its realistic to combine these two concepts into one? | ||
|
||
state_manager.update_state_for_stream(self.name, self.namespace, checkpoint_reader.read_state()) | ||
return state_manager.create_state_message(self.name, self.namespace) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
{ | ||
"streams": [ | ||
{ | ||
"stream": { | ||
"name": "email_subscriptions", | ||
"json_schema": {}, | ||
"supported_sync_modes": ["full_refresh"] | ||
}, | ||
"sync_mode": "full_refresh", | ||
"destination_sync_mode": "overwrite" | ||
}, | ||
{ | ||
"stream": { | ||
"name": "companies", | ||
"json_schema": {}, | ||
"supported_sync_modes": ["full_refresh", "incremental"], | ||
"source_defined_cursor": true, | ||
"default_cursor_field": ["updatedAt"] | ||
}, | ||
"sync_mode": "incremental", | ||
"cursor_field": ["updatedAt"], | ||
"destination_sync_mode": "append" | ||
}, | ||
{ | ||
"stream": { | ||
"name": "contacts_form_submissions", | ||
"json_schema": {}, | ||
"supported_sync_modes": ["full_refresh"] | ||
}, | ||
"sync_mode": "full_refresh", | ||
"destination_sync_mode": "overwrite" | ||
} | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
[ | ||
{ | ||
"type": "STREAM", | ||
"stream": { | ||
"stream_state": { "vidOffset": 5345144171 }, | ||
"stream_descriptor": { "name": "contacts_form_submissions" } | ||
} | ||
} | ||
] |
Uh oh!
There was an error while loading. Please reload this page.