Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[airbyte-cdk] Decouple request_options_provider from datetime_based_cursor + concurrent_cursor features for low-code #45413

Merged
merged 4 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@
StopConditionPaginationStrategyDecorator,
)
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.request_options import (
DatetimeBasedRequestOptionsProvider,
DefaultRequestOptionsProvider,
InterpolatedRequestOptionsProvider,
RequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever, SimpleRetrieverTestReadDecorator
Expand Down Expand Up @@ -653,6 +658,40 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi
"per_partition_cursor": combined_slicers if isinstance(combined_slicers, PerPartitionCursor) else None,
"is_global_substream_cursor": isinstance(combined_slicers, GlobalSubstreamCursor),
}

if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
cursor_model = model.incremental_sync

end_time_option = (
RequestOption(
inject_into=RequestOptionType(cursor_model.end_time_option.inject_into.value),
field_name=cursor_model.end_time_option.field_name,
parameters=cursor_model.parameters or {},
)
if cursor_model.end_time_option
else None
)
start_time_option = (
RequestOption(
inject_into=RequestOptionType(cursor_model.start_time_option.inject_into.value),
field_name=cursor_model.start_time_option.field_name,
parameters=cursor_model.parameters or {},
)
if cursor_model.start_time_option
else None
)

request_options_provider = DatetimeBasedRequestOptionsProvider(
start_time_option=start_time_option,
end_time_option=end_time_option,
partition_field_start=cursor_model.partition_field_end,
partition_field_end=cursor_model.partition_field_end,
config=config,
parameters=model.parameters or {},
)
else:
request_options_provider = None

transformations = []
if model.transformations:
for transformation_model in model.transformations:
Expand All @@ -663,6 +702,7 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi
name=model.name,
primary_key=primary_key,
stream_slicer=combined_slicers,
request_options_provider=request_options_provider,
stop_condition_on_cursor=stop_condition_on_cursor,
client_side_incremental_sync=client_side_incremental_sync,
transformations=transformations,
Expand Down Expand Up @@ -1126,6 +1166,7 @@ def create_simple_retriever(
name: str,
primary_key: Optional[Union[str, List[str], List[List[str]]]],
stream_slicer: Optional[StreamSlicer],
request_options_provider: Optional[RequestOptionsProvider] = None,
stop_condition_on_cursor: bool = False,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
Expand All @@ -1140,11 +1181,19 @@ def create_simple_retriever(
client_side_incremental_sync=client_side_incremental_sync,
)
url_base = model.requester.url_base if hasattr(model.requester, "url_base") else requester.get_url_base()
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})

# Define cursor only if per partition or common incremental support is needed
cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None

if not stream_slicer or not isinstance(stream_slicer, DatetimeBasedCursor) or type(stream_slicer) is not DatetimeBasedCursor:
# Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods).
# Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement
# their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's
# request_options_provider
request_options_provider = stream_slicer or DefaultRequestOptionsProvider(config=config, parameters={})

stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})

cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
paginator = (
self._create_component_from_model(
Expand All @@ -1168,6 +1217,7 @@ def create_simple_retriever(
requester=requester,
record_selector=record_selector,
stream_slicer=stream_slicer,
request_option_provider=request_options_provider,
cursor=cursor,
config=config,
maximum_number_of_slices=self._limit_slices_fetched or 5,
Expand All @@ -1181,6 +1231,7 @@ def create_simple_retriever(
requester=requester,
record_selector=record_selector,
stream_slicer=stream_slicer,
request_option_provider=request_options_provider,
cursor=cursor,
config=config,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.requesters.request_options.datetime_based_request_options_provider import (
DatetimeBasedRequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_options.default_request_options_provider import DefaultRequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider

__all__ = ["InterpolatedRequestOptionsProvider", "RequestOptionsProvider"]
__all__ = ["DatetimeBasedRequestOptionsProvider", "DefaultRequestOptionsProvider", "InterpolatedRequestOptionsProvider", "RequestOptionsProvider"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we add DefaultRequestOptionsProvider to the CDK's top level init.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep will add!

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class DatetimeBasedRequestOptionsProvider(RequestOptionsProvider):
"""
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the
outbound request being made
"""

config: Config
parameters: InitVar[Mapping[str, Any]]
start_time_option: Optional[RequestOption] = None
end_time_option: Optional[RequestOption] = None
partition_field_start: Optional[str] = None
partition_field_end: Optional[str] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._partition_field_start = InterpolatedString.create(self.partition_field_start or "start_time", parameters=parameters)
self._partition_field_end = InterpolatedString.create(self.partition_field_end or "end_time", parameters=parameters)

def stream_slices(self) -> Iterable[StreamSlice]:
# When all processing is managed by ConcurrentCursor and concurrent read processing, this method shouldn't end up being used
# It kinda sucks we have to implement this given that partition generation should really just be a responsibility of the
# cursor but making this class implement StreamSlicer makes it easier in the model_to_component_factory
yield from [{}]

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.request_parameter, stream_slice)

def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.header, stream_slice)

def get_request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
return self._get_request_options(RequestOptionType.body_data, stream_slice)

def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json, stream_slice)

def _get_request_options(self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]) -> Mapping[str, Any]:
options: MutableMapping[str, Any] = {}
if not stream_slice:
return options
if self.start_time_option and self.start_time_option.inject_into == option_type:
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore # field_name is always casted to an interpolated string
self._partition_field_start.eval(self.config)
)
if self.end_time_option and self.end_time_option.inject_into == option_type:
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get(self._partition_field_end.eval(self.config)) # type: ignore # field_name is always casted to an interpolated string
return options
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class DefaultRequestOptionsProvider(RequestOptionsProvider):
"""
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the
outbound request being made
"""

config: Config
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
pass

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
return {}

def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import json
from dataclasses import InitVar, dataclass, field
from functools import partial
Expand All @@ -16,6 +17,7 @@
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.request_options import DefaultRequestOptionsProvider, RequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
Expand Down Expand Up @@ -61,6 +63,7 @@ class SimpleRetriever(Retriever):
_primary_key: str = field(init=False, repr=False, default="")
paginator: Optional[Paginator] = None
stream_slicer: StreamSlicer = field(default_factory=lambda: SinglePartitionRouter(parameters={}))
request_option_provider: Optional[RequestOptionsProvider] = None
cursor: Optional[DeclarativeCursor] = None
ignore_stream_slicer_parameters_on_paginated_requests: bool = False

Expand All @@ -71,6 +74,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._last_record: Optional[Record] = None
self._parameters = parameters
self._name = InterpolatedString(self._name, parameters=parameters) if isinstance(self._name, str) else self._name
self.request_option_provider = self.request_option_provider or DefaultRequestOptionsProvider(config=self.config, parameters={})

# This mapping is used during a resumable full refresh syncs to indicate whether a partition has started syncing
# records. Partitions serve as the key and map to True if they already began processing records
Expand Down Expand Up @@ -158,7 +162,7 @@ def _request_params(
stream_slice,
next_page_token,
self._paginator.get_request_params,
self.stream_slicer.get_request_params,
self.request_option_provider.get_request_params,
)
if isinstance(params, str):
raise ValueError("Request params cannot be a string")
Expand All @@ -184,7 +188,7 @@ def _request_body_data(
stream_slice,
next_page_token,
self._paginator.get_request_body_data,
self.stream_slicer.get_request_body_data,
self.request_option_provider.get_request_body_data,
)

def _request_body_json(
Expand All @@ -203,7 +207,7 @@ def _request_body_json(
stream_slice,
next_page_token,
self._paginator.get_request_body_json,
self.stream_slicer.get_request_body_json,
self.request_option_provider.get_request_body_json,
)
if isinstance(body_json, str):
raise ValueError("Request body json cannot be a string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Protocol, Tuple

from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import AbstractStreamStateConverter
from airbyte_cdk.sources.types import Config


def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any:
Expand Down Expand Up @@ -58,6 +60,23 @@ def extract_value(self, record: Record) -> CursorValueType:
return cursor_value # type: ignore # we assume that the value the path points at is a comparable


class InterpolatedCursorField(CursorField):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this new InterpolatedCursorField so that we can perform interpolation evaluation at processing time when we observe records, instead of at parse time when we build the ConcurrentCursor.

I feel like this is a nice to have from the mental model of interpolation only being used at runtime, but the unfortunate side effect is that we leak interpolation as a concept into the Concurrent CDK. I've included the code to spark discussion, but my vote is that we remove this and just perform the interpolation ahead of time when we instantiate the ConcurrentCursor

def __init__(self, cursor_field_key: InterpolatedString, config: Config):
self._cursor_field_key = cursor_field_key
self.config = config

@property
def cursor_field_key(self) -> str:
return self._cursor_field_key.eval(config=self.config)

def extract_value(self, record: Record) -> CursorValueType:
resolved_cursor_field_key = self._cursor_field_key.eval(config=self.config)
cursor_value = record.data.get(resolved_cursor_field_key)
if cursor_value is None:
raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
return cursor_value # type: ignore # we assume that the value the path points at is a comparable


class Cursor(ABC):
@property
@abstractmethod
Expand Down Expand Up @@ -143,6 +162,7 @@ def __init__(
end_provider: Callable[[], CursorValueType],
lookback_window: Optional[GapType] = None,
slice_range: Optional[GapType] = None,
cursor_granularity: Optional[GapType] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cursor_granularity is a part of low-code cursors, but not concurrent. The main lever this turns is the reduction of record duplicates from date window edges the overlap.

Our overall stance has been that some duplicates over the course of a sync are tolerable, but we should never lose records. After auditing our existing repository connectors, we have 14 cases where we use "P1D" or greater as granularity. This I would consider to be a potentially unacceptable number of duplicates and this is on some of our more highly used connectors hence why I've added this in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for checking how often the feature is used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change of supporting granularity here.

For the topic of duplication and big granularities, isn't it driven by the API granularity? For example, if an API only takes format YYYY-MM as an input, I don't see how we can have something different than P1M as a cursor granularity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the largest sizes we do start to lose some options. Although I don't think the month or year option is that common or if I've ever seen it. Bad API design in my opinion. I think there's an interesting question of inference in that it's probably 99% chance a user using a date time of hours would use hour granularity. But that might be more of low-code interface as opposed to the underlying cursor. Which we would still need cursor_granularity configurable

) -> None:
self._stream_name = stream_name
self._stream_namespace = stream_namespace
Expand All @@ -159,6 +179,7 @@ def __init__(
self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
self._lookback_window = lookback_window
self._slice_range = slice_range
self._cursor_granularity = cursor_granularity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We might have (a very small piece of) logic that is duplicated. My understanding is that this value is basically defined as part of the implementation of AbstractStreamStateConverter.increment. Is this fair? If so, would there be a way to not duplicate this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of DateTimeStreamStateConverter.increment() actually might not be duplicated, but an area that I had not originally accounted for. So in that regard you found a bug I need to fix! The two usages of the method:

  • _compare_interval(): which is interestingly completely unused in our code
  • merge_intervals(): I need to fix the increment method so the DateTimeStreamStateConverter takes in an optional parameter cursor_granularity. And we should use that in place of timedelta(milliseconds=1). Otherwise we won't properly merge partitions together since they won't line up correctly.

I'll adjust the code accordingly and add a test case for this!


@property
def state(self) -> MutableMapping[str, Any]:
Expand Down Expand Up @@ -312,7 +333,10 @@ def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType)
current_lower_boundary = lower
while not stop_processing:
current_upper_boundary = min(current_lower_boundary + self._slice_range, upper)
yield current_lower_boundary, current_upper_boundary
if self._cursor_granularity:
yield current_lower_boundary, current_upper_boundary - self._cursor_granularity
else:
yield current_lower_boundary, current_upper_boundary
current_lower_boundary = current_upper_boundary
if current_upper_boundary >= upper:
stop_processing = True
Loading
Loading