Skip to content

CDK: add support for streams with state attribute #9746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@ repos:
rev: 21.11b1
hooks:
- id: black
args: ["--line-length=140"]
- repo: https://github.com/timothycrosley/isort
rev: 5.10.1
hooks:
- id: isort
args: ["--dont-follow-links", "--jobs=-1"]
args:
[
"--settings-path=tools/python/.isort.cfg",
"--dont-follow-links",
"--jobs=-1",
]
additional_dependencies: ["colorama"]
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v2.5.0
Expand All @@ -34,12 +40,14 @@ repos:
rev: v0.0.1a2.post1
hooks:
- id: pyproject-flake8
args: ["--config=tools/python/.flake8"]
additional_dependencies: ["mccabe"]
alias: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910-1
hooks:
- id: mypy
args: ["--config-file=tools/python/.mypy.ini"]
exclude: |
(?x)^.*(
octavia-cli/unit_tests/|
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.48
Add support for streams with explicit state attribute.

## 0.1.47
Fix typing errors.

Expand Down
76 changes: 57 additions & 19 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
@abstractmethod
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
:param config: The user-provided configuration as specified by the source's spec.
Any stream construction related operation should happen here.
:return: A list of the streams in this source connector.
"""

Expand All @@ -65,12 +66,16 @@ def name(self) -> str:
return self.__class__.__name__

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
"""Implements the Discover operation from the Airbyte Specification.
See https://docs.airbyte.io/architecture/airbyte-specification.
"""
streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
return AirbyteCatalog(streams=streams)

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
"""Implements the Check Connection operation from the Airbyte Specification.
See https://docs.airbyte.io/architecture/airbyte-specification.
"""
try:
check_succeeded, error = self.check_connection(logger, config)
if not check_succeeded:
Expand All @@ -81,7 +86,11 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def read(
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: MutableMapping[str, Any] = None,
) -> Iterator[AirbyteMessage]:
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
connector_state = copy.deepcopy(state or {})
Expand All @@ -96,10 +105,12 @@ def read(
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance:
raise KeyError(
f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}"
f"The requested stream {configured_stream.stream.name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
)

try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
Expand All @@ -108,10 +119,11 @@ def read(
internal_config=internal_config,
)
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
raise e
finally:
logger.info(f"Finished syncing {self.name}")
timer.finish_event()
logger.info(f"Finished syncing {configured_stream.stream.name}")
logger.info(timer.report())

logger.info(f"Finished syncing {self.name}")
Expand All @@ -131,7 +143,13 @@ def _read_stream(

use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
if use_incremental:
record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state, internal_config)
record_iterator = self._read_incremental(
logger,
stream_instance,
configured_stream,
connector_state,
internal_config,
)
else:
record_iterator = self._read_full_refresh(stream_instance, configured_stream, internal_config)

Expand Down Expand Up @@ -166,19 +184,31 @@ def _read_incremental(
connector_state: MutableMapping[str, Any],
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
"""Read stream using incremental algorithm

:param logger:
:param stream_instance:
:param configured_stream:
:param connector_state:
:param internal_config:
:return:
"""
stream_name = configured_stream.stream.name
stream_state = connector_state.get(stream_name, {})
if stream_state:
if stream_state and 'state' in dir(stream_instance):
stream_instance.state = stream_state
logger.info(f"Setting state of {stream_name} stream to {stream_state}")

slices = stream_instance.stream_slices(
cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state
cursor_field=configured_stream.cursor_field,
sync_mode=SyncMode.incremental,
stream_state=stream_state,
)
total_records_counter = 0
for slice in slices:
for _slice in slices:
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
stream_slice=slice,
stream_slice=_slice,
stream_state=stream_state,
cursor_field=configured_stream.cursor_field or None,
)
Expand All @@ -187,7 +217,7 @@ def _read_incremental(
stream_state = stream_instance.get_updated_state(stream_state, record_data)
checkpoint_interval = stream_instance.state_checkpoint_interval
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
yield self._checkpoint_state(stream_instance, stream_state, connector_state)

total_records_counter += 1
# This functionality should ideally live outside of this method
Expand All @@ -197,28 +227,36 @@ def _read_incremental(
# Break from slice loop to save state and exit from _read_incremental function.
break

yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
yield self._checkpoint_state(stream_instance, stream_state, connector_state)
if self._limit_reached(internal_config, total_records_counter):
return

def _read_full_refresh(
self, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream, internal_config: InternalConfig
self,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)
total_records_counter = 0
for slice in slices:
records = stream_instance.read_records(
stream_slice=slice, sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field
stream_slice=slice,
sync_mode=SyncMode.full_refresh,
cursor_field=configured_stream.cursor_field,
)
for record in records:
yield self._as_airbyte_record(configured_stream.stream.name, record)
total_records_counter += 1
if self._limit_reached(internal_config, total_records_counter):
return

def _checkpoint_state(self, stream_name, stream_state, connector_state, logger):
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
connector_state[stream_name] = stream_state
def _checkpoint_state(self, stream, stream_state, connector_state):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keu could we maintain the invariant that the state output from this method is always the same as stream.state? I think the only thing I'm concerned about is that if I define both stream.state and get_updated_state then there are two potentially different states floating around which will lead to confusing behavior.

Can we always maintain the invariant that whatever is stored in stream.state contains the state object being output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada I'm not sure how we can achieve this, _checkpoint_state will always return value from stream.state if there is any, if not it will fallback to the state obtained from get_updated_state,
so what is the problem here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the contract is IncrementalMixin implementation always takes precedence over get_updated_state? sounds fine w me.

Should we add this to the docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do

try:
connector_state[stream.name] = stream.state
except AttributeError:
connector_state[stream.name] = stream_state

return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=connector_state))

@lru_cache(maxsize=None)
Expand Down
36 changes: 35 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,40 @@ def package_name_from_class(cls: object) -> str:
return module.__name__.split(".")[0]


class IncrementalMixin(ABC):
"""Mixing to make stream incremental.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Mixing to make stream incremental.
"""Mixin to make stream incremental.


class IncrementalStream(Stream, IncrementalMixin):
@property
def state(self):
return self._state

@state.setter
def state(self, value):
self._state[self.cursor_field] = value[self.cursor_field]
"""

@property
@abstractmethod
def state(self) -> MutableMapping[str, Any]:
"""State getter, should return state in form that can serialized to a string and send to the output
as a STATE AirbyteMessage.

A good example of a state is a cursor_value:
{
self.cursor_field: "cursor_value"
}

State should try to be as small as possible but at the same time descriptive enough to restore
syncing process from the point where it stopped.
"""

@state.setter
@abstractmethod
def state(self, value: MutableMapping[str, Any]):
"""State setter, accept state serialized by state getter."""


class Stream(ABC):
"""
Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.
Expand Down Expand Up @@ -137,7 +171,7 @@ def state_checkpoint_interval(self) -> Optional[int]:
return None

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's put a deprecation notice?

"""
""" This method is going to be deprecated in favor of IncrementalMixin, see IncrementalMixin docs for more info.
Override to extract state from the latest record. Needed to implement incremental sync.

Inspects the latest record extracted from the data source and the current state object and return an updated state object.
Expand Down
13 changes: 10 additions & 3 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.47",
version="0.1.48",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -46,7 +46,7 @@
packages=find_packages(exclude=("unit_tests",)),
install_requires=[
"backoff",
"dpath==2.0.1",
"dpath~=2.0.1",
"jsonschema~=3.2.0",
"jsonref~=0.2",
"pendulum",
Expand All @@ -59,7 +59,14 @@
],
python_requires=">=3.7.0",
extras_require={
"dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock", "pytest-httpserver"],
"dev": [
"MyPy~=0.812",
"pytest",
"pytest-cov",
"pytest-mock",
"requests-mock",
"pytest-httpserver",
],
"sphinx-docs": [
"Sphinx~=4.2",
"sphinx-rtd-theme~=1.0",
Expand Down
Loading