Skip to content

[RFR for API Sources] New Python interfaces to support resumable full refresh #37429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
45be2a9
initial checkpoint reader interface/implementations and sketching out…
brianjlai Apr 9, 2024
3aacffe
cleaning up and simplifying checkpoint reader interface
brianjlai Apr 12, 2024
67582c2
Merge branch 'master' into resumable_full_refresh_python_cdk_new_inte…
brianjlai Apr 18, 2024
f0af212
clean up interfaces, fix tests, and add new tests across all layers
brianjlai Apr 19, 2024
2d69903
fix ci issues and clean out the code a bit
brianjlai Apr 22, 2024
5484d39
format
brianjlai Apr 22, 2024
45ce52d
break apart cdk mock server tests into smaller test files and move he…
brianjlai Apr 22, 2024
5f488ab
better incremental state management code and more mock server tests
brianjlai Apr 23, 2024
2169498
Merge branch 'master' into resumable_full_refresh_python_cdk_new_inte…
brianjlai Apr 23, 2024
6c2d596
remove sync_mode from checkpointer and stream read logic
brianjlai Apr 23, 2024
f60a0b7
remove get_final_checkpoint and update tests and simplify Stream read…
brianjlai Apr 24, 2024
1b18edb
fix mypy
brianjlai Apr 24, 2024
514fb3f
Merge branch 'master' into resumable_full_refresh_python_cdk_new_inte…
brianjlai Apr 24, 2024
a39bcc6
fix tests and when instantiating checkpoint reader use self.state get…
brianjlai Apr 24, 2024
1e54edc
Merge branch 'master' into resumable_full_refresh_python_cdk_new_inte…
brianjlai Apr 24, 2024
cc76250
don't perform RFR for http substreams
brianjlai Apr 25, 2024
12f1789
update PR based on feedback and add docs page for RFR
brianjlai Apr 30, 2024
28564e4
Merge branch 'master' into resumable_full_refresh_python_cdk_new_inte…
brianjlai Apr 30, 2024
94bc5be
fix tests and polish documentation a bit
brianjlai Apr 30, 2024
8ddae37
pr feedback and fix bug with empty {} for declarative streams
brianjlai May 1, 2024
52b1a5e
adding some code to trigger full ci run for a few test connectors and…
brianjlai May 3, 2024
e0e88fb
Merge branch 'master' into resumable_full_refresh_python_cdk_new_inte…
brianjlai May 3, 2024
e976383
remove connector code after running tests against a few different con…
brianjlai May 4, 2024
0af07eb
last small changes before merge
brianjlai May 6, 2024
8bda880
Merge branch 'master' into resumable_full_refresh_python_cdk_new_inte…
brianjlai May 6, 2024
87fd05e
format -_-
brianjlai May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,7 @@ def _read_stream(
stream_instance.log_stream_sync_configuration()

stream_name = configured_stream.stream.name
# The platform always passes stream state regardless of sync mode. We shouldn't need to consider this case within the
# connector, but right now we need to prevent accidental usage of the previous stream state
stream_state = (
state_manager.get_stream_state(stream_name, stream_instance.namespace)
if configured_stream.sync_mode == SyncMode.incremental
else {}
)
stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)

if stream_state and "state" in dir(stream_instance) and not self._stream_state_is_full_refresh(stream_state):
stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance)
Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
#

# Initialize Streams Package
from .core import FULL_REFRESH_SENTINEL_STATE_KEY, IncrementalMixin, Stream
from .core import FULL_REFRESH_SENTINEL_STATE_KEY, IncrementalMixin, StateMixin, Stream

__all__ = ["FULL_REFRESH_SENTINEL_STATE_KEY", "IncrementalMixin", "Stream"]
__all__ = ["FULL_REFRESH_SENTINEL_STATE_KEY", "IncrementalMixin", "StateMixin", "Stream"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.


from .checkpoint_reader import CheckpointMode, CheckpointReader, FullRefreshCheckpointReader, IncrementalCheckpointReader, ResumableFullRefreshCheckpointReader

__all__ = ["CheckpointMode", "CheckpointReader", "FullRefreshCheckpointReader", "IncrementalCheckpointReader", "ResumableFullRefreshCheckpointReader"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Iterable, Mapping, MutableMapping, Optional


class CheckpointMode(Enum):
INCREMENTAL = "incremental"
RESUMABLE_FULL_REFRESH = "resumable_full_refresh"
FULL_REFRESH = "full_refresh"


class CheckpointReader(ABC):
@abstractmethod
def next(self) -> Optional[MutableMapping[str, Any]]:
"""
Returns the next slice to process
"""

@abstractmethod
def observe(self, new_state: Mapping[str, Any]) -> MutableMapping[str, Any]:
"""
Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.

WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method.
In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.
"""

@abstractmethod
def read_state(self) -> MutableMapping[str, Any]:
"""
This is interesting. With this move, we've turned checkpoint reader to resemble even more of a cursor because we are acting
even more like an intermediary since we are more regularly assigning Stream.state to CheckpointReader._state via observe
"""


class IncrementalCheckpointReader(CheckpointReader):
def __init__(self, stream_slices: Iterable[Optional[Mapping[str, Any]]]):
self._state = None
self._stream_slices = iter(stream_slices)

def next(self) -> Optional[MutableMapping[str, Any]]:
try:
return next(self._stream_slices)
except StopIteration:
return None

def observe(self, new_state: Mapping[str, Any]):
# This is really only needed for backward compatibility with the legacy state management implementations.
# We only update the underlying _state value for legacy, otherwise managing state is done by the connector implementation
self._state = new_state

def read_state(self) -> MutableMapping[str, Any]:
return self._state


class ResumableFullRefreshCheckpointReader(CheckpointReader):
def __init__(self, stream_state: MutableMapping[str, Any]):
self._state: Optional[MutableMapping[str, Any]] = stream_state
# can i have a dummy for first iteration to trigger the loop, and subsequent ones, we see {} and then therefor end the loop

def next(self) -> Optional[MutableMapping[str, Any]]:
return self._state

def observe(self, new_state: Mapping[str, Any]):
# observe() was originally just for backwards compatibility, but we can potentially fold it more into the the read_records()
# flow as I've coded out so far.
self._state = new_state

def read_state(self) -> MutableMapping[str, Any]:
return self._state or {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we call get_checkpoint() and the end of the last slice/page and at the end of the sync, we end up emitting the same final state twice. We can potentially insert more fields to track state internally within the reader, but i don't think its worth the hassle



class FullRefreshCheckpointReader(CheckpointReader):
def __init__(self):
self._stream_slices = iter([{}])

def next(self) -> Optional[MutableMapping[str, Any]]:
try:
return next(self._stream_slices)
except StopIteration:
return None

def observe(self, new_state: Mapping[str, Any]):
pass

def read_state(self) -> MutableMapping[str, Any]:
return {"__ab_is_sync_complete": True} # replace this with the new terminal value from ella
151 changes: 120 additions & 31 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, SyncMode
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.streams.checkpoint import (
CheckpointMode,
CheckpointReader,
FullRefreshCheckpointReader,
IncrementalCheckpointReader,
ResumableFullRefreshCheckpointReader,
)

# list of all possible HTTP methods which can be used for sending of request bodies
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, ResourceSchemaLoader
Expand Down Expand Up @@ -45,10 +52,10 @@ def package_name_from_class(cls: object) -> str:
raise ValueError(f"Could not find package name for class {cls}")


class IncrementalMixin(ABC):
"""Mixin to make stream incremental.
class StateMixin(ABC): # Rename to CheckpointMixin?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this mixin because state setter/getter are no longer specific to Incremental streams. But to retain backwards compatibility I left the old IncrementalMixin but it just inherits from this mixin

"""Mixin for a stream that implements reading and writing the internal state used to checkpoint sync progress to the platform

class IncrementalStream(Stream, IncrementalMixin):
class CheckpointedStream(Stream, StateMixin):
@property
def state(self):
return self._state
Expand Down Expand Up @@ -79,6 +86,21 @@ def state(self, value: MutableMapping[str, Any]) -> None:
"""State setter, accept state serialized by state getter."""


@deprecated(version="0.x.x", reason="Deprecated in favor of the StateMixin which offers similar functionality")
class IncrementalMixin(StateMixin, ABC):
"""Mixin to make stream incremental.

class IncrementalStream(Stream, IncrementalMixin):
@property
def state(self):
return self._state

@state.setter
def state(self, value):
self._state[self.cursor_field] = value[self.cursor_field]
"""


class Stream(ABC):
"""
Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.
Expand Down Expand Up @@ -123,22 +145,36 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o
sync_mode = configured_stream.sync_mode
cursor_field = configured_stream.cursor_field

slices = self.stream_slices(
cursor_field=cursor_field,
sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior
stream_state=stream_state,
)
logger.debug(f"Processing stream slices for {self.name} (sync_mode: {sync_mode.name})", extra={"stream_slices": slices})
checkpoint_mode = self._checkpoint_mode
if checkpoint_mode == CheckpointMode.INCREMENTAL:
slices = self.stream_slices(
cursor_field=cursor_field,
sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior
stream_state=stream_state,
)
# yikes more backwards compatibility cruft. Because of poor foresight, we wrote the default stream_slices() method to return
# [None], which is not only confusing, but we 've now normalized this behavior for connector developers. Now we have connectors
# that also return [None]. This is objectively misleading and the ideal interface is return [{}] to indicate we still want
# to iterate a first slice, but w/ no specific slice values. None is bad, and now I feel bad that I have to write this hack
if slices == [None]:
slices = [{}]
logger.debug(f"Processing stream slices for {self.name} (sync_mode: {sync_mode.name})", extra={"stream_slices": slices})
checkpoint_reader = IncrementalCheckpointReader(stream_slices=slices)
elif checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH:
checkpoint_reader = ResumableFullRefreshCheckpointReader(stream_state=stream_state)
else:
checkpoint_reader = FullRefreshCheckpointReader()

has_slices = False
next_slice = checkpoint_reader.next()
record_counter = 0
for _slice in slices:
while next_slice is not None:
has_slices = True
if slice_logger.should_log_slice_message(logger):
yield slice_logger.create_slice_log_message(_slice)
yield slice_logger.create_slice_log_message(next_slice)
records = self.read_records(
sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior
stream_slice=_slice,
stream_slice=next_slice,
stream_state=stream_state,
cursor_field=cursor_field or None,
)
Expand All @@ -148,37 +184,40 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o
hasattr(record_data_or_message, "type") and record_data_or_message.type == MessageType.RECORD
):
record_data = record_data_or_message if isinstance(record_data_or_message, Mapping) else record_data_or_message.record

# BL: Thanks I hate it. RFR fundamentally doesn't fit with the concept of the legacy Stream.get_updated_state()
# method because RFR streams rely on pagination as a cursor and get_updated_state() was designed to have
# the CDK manage state using specifically the last seen record.

# todo QUESTION: Altho this only affects the incremental legacy case, and we still have cursor protection,
# if we split observe into observe_slice vs observe_record then we could get rid of this condition entirely assuming
# the bug below is fixed
if self.cursor_field:
# Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This
# should be fixed on the stream implementation, but we should also protect against this in the CDK as well
stream_state = self.get_updated_state(stream_state, record_data)
self._observe_state_wrapper(checkpoint_reader, self.get_updated_state(stream_state, record_data))
record_counter += 1

if sync_mode == SyncMode.incremental:
# Checkpoint intervals are a bit controversial, but see below comment about why we're gating it right now
checkpoint_interval = self.state_checkpoint_interval
if checkpoint_interval and record_counter % checkpoint_interval == 0:
airbyte_state_message = self._checkpoint_state(stream_state, state_manager)
airbyte_state_message = self._new_checkpoint_state(
checkpoint_reader=checkpoint_reader, state_manager=state_manager
)
yield airbyte_state_message

if internal_config.is_limit_reached(record_counter):
break
self._observe_state_wrapper(checkpoint_reader)
airbyte_state_message = self._new_checkpoint_state(checkpoint_reader=checkpoint_reader, state_manager=state_manager)
# airbyte_state_message = self._checkpoint_state(stream_state, state_manager)
yield airbyte_state_message

next_slice = checkpoint_reader.next()

if sync_mode == SyncMode.incremental:
# Even though right now, only incremental streams running as incremental mode will emit periodic checkpoints. Rather than
# overhaul how refresh interacts with the platform, this positions the code so that once we want to start emitting
# periodic checkpoints in full refresh mode it can be done here
airbyte_state_message = self._checkpoint_state(stream_state, state_manager)
yield airbyte_state_message

if not has_slices or sync_mode == SyncMode.full_refresh:
if sync_mode == SyncMode.full_refresh:
# We use a dummy state if there is no suitable value provided by full_refresh streams that do not have a valid cursor.
# Incremental streams running full_refresh mode emit a meaningful state
stream_state = stream_state or {FULL_REFRESH_SENTINEL_STATE_KEY: True}

# We should always emit a final state message for full refresh sync or streams that do not have any slices
airbyte_state_message = self._checkpoint_state(stream_state, state_manager)
if not has_slices:
airbyte_state_message = self._new_checkpoint_state(checkpoint_reader=checkpoint_reader, state_manager=state_manager)
yield airbyte_state_message

@abstractmethod
Expand Down Expand Up @@ -210,7 +249,8 @@ def as_airbyte_stream(self) -> AirbyteStream:
if self.namespace:
stream.namespace = self.namespace

if self.supports_incremental:
# If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value
if not self.supports_resumable_full_refresh and self.supports_incremental:
stream.source_defined_cursor = self.source_defined_cursor
stream.supported_sync_modes.append(SyncMode.incremental) # type: ignore
stream.default_cursor_field = self._wrapped_cursor_field()
Expand Down Expand Up @@ -295,7 +335,7 @@ def stream_slices(
:param stream_state:
:return:
"""
return [None]
return [{}] # I really hate that this used to be None, which was just cruft to get the first loop in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Do you want to keep the comment in code for posterity or can we remove it?

Copy link
Contributor Author

@brianjlai brianjlai Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing as I think its understandable that {} == an empty single slice


@property
def state_checkpoint_interval(self) -> Optional[int]:
Expand Down Expand Up @@ -328,6 +368,24 @@ def get_updated_state(
"""
return {}

@property
def _supports_state(self) -> bool:
if hasattr(type(self), "state") and getattr(type(self), "state").fset is not None:
# Modern case where a stream manages state using getter/setter
return True
else:
# Legacy case where the CDK manages state via the get_updated_state method
return type(self).get_updated_state != Stream.get_updated_state

@property
def _checkpoint_mode(self) -> CheckpointMode:
if self._supports_state and len(self._wrapped_cursor_field()) > 0:
return CheckpointMode.INCREMENTAL
elif self._supports_state:
return CheckpointMode.RESUMABLE_FULL_REFRESH
else:
return CheckpointMode.FULL_REFRESH

def log_stream_sync_configuration(self) -> None:
"""
Logs the configuration of this stream.
Expand Down Expand Up @@ -379,3 +437,34 @@ def _checkpoint_state( # type: ignore # ignoring typing for ConnectorStateMana
except AttributeError:
state_manager.update_state_for_stream(self.name, self.namespace, stream_state)
return state_manager.create_state_message(self.name, self.namespace)

# Below is the one part of the interface that i'm a little iffy about because checkpoint_reader and the connector state manager
# feel like they do very similar things. If they were combined it would be cool, but not something that is show stopping

def _observe_state_wrapper(
self,
checkpoint_reader: CheckpointReader,
stream_state: Optional[Mapping[str, Any]] = None,
):
# todo: BL some of this makes me feel like the checkpoint_reader feels eerily similar to our existing
# connector state manager. I wonder if its realistic to combine these two concepts into one?

# Convenience method that attempts to read the Stream's state using the recommended way of connector's managing their
# own state via state setter/getter. But if we get back an AttributeError, then the legacy Stream.get_updated_state()
# method is used as a fallback method.
try:
new_state = self.state # type: ignore # we know the field might not exist...
except AttributeError:
new_state = stream_state
checkpoint_reader.observe(new_state)

def _new_checkpoint_state( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies
self,
checkpoint_reader: CheckpointReader,
state_manager,
) -> AirbyteMessage:
# todo: BL some of this makes me feel like the checkpoint_reader feels eerily similar to our existing
# connector state manager. I wonder if its realistic to combine these two concepts into one?

state_manager.update_state_for_stream(self.name, self.namespace, checkpoint_reader.read_state())
return state_manager.create_state_message(self.name, self.namespace)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"streams": [
{
"stream": {
"name": "email_subscriptions",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "companies",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "contacts_form_submissions",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[
{
"type": "STREAM",
"stream": {
"stream_state": { "vidOffset": 5345144171 },
"stream_descriptor": { "name": "contacts_form_submissions" }
}
}
]
Loading
Loading