-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[airbyte-cdk] Decouple request_options_provider from datetime_based_cursor + concurrent_cursor features for low-code #45413
Changes from 1 commit
2cf2151
dfaef88
28fa872
52274e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
# | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from dataclasses import InitVar, dataclass | ||
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union | ||
|
||
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString | ||
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType | ||
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider | ||
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState | ||
|
||
|
||
@dataclass | ||
class DatetimeBasedRequestOptionsProvider(RequestOptionsProvider): | ||
""" | ||
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the | ||
outbound request being made | ||
""" | ||
|
||
config: Config | ||
parameters: InitVar[Mapping[str, Any]] | ||
start_time_option: Optional[RequestOption] = None | ||
end_time_option: Optional[RequestOption] = None | ||
partition_field_start: Optional[str] = None | ||
partition_field_end: Optional[str] = None | ||
|
||
def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
self._partition_field_start = InterpolatedString.create(self.partition_field_start or "start_time", parameters=parameters) | ||
self._partition_field_end = InterpolatedString.create(self.partition_field_end or "end_time", parameters=parameters) | ||
|
||
def stream_slices(self) -> Iterable[StreamSlice]: | ||
# When all processing is managed by ConcurrentCursor and concurrent read processing, this method shouldn't end up being used | ||
# It kinda sucks we have to implement this given that partition generation should really just be a responsibility of the | ||
# cursor but making this class implement StreamSlicer makes it easier in the model_to_component_factory | ||
yield from [{}] | ||
|
||
def get_request_params( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._get_request_options(RequestOptionType.request_parameter, stream_slice) | ||
|
||
def get_request_headers( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._get_request_options(RequestOptionType.header, stream_slice) | ||
|
||
def get_request_body_data( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Union[Mapping[str, Any], str]: | ||
return self._get_request_options(RequestOptionType.body_data, stream_slice) | ||
|
||
def get_request_body_json( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._get_request_options(RequestOptionType.body_json, stream_slice) | ||
|
||
def _get_request_options(self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]) -> Mapping[str, Any]: | ||
options: MutableMapping[str, Any] = {} | ||
if not stream_slice: | ||
return options | ||
if self.start_time_option and self.start_time_option.inject_into == option_type: | ||
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore # field_name is always casted to an interpolated string | ||
self._partition_field_start.eval(self.config) | ||
) | ||
if self.end_time_option and self.end_time_option.inject_into == option_type: | ||
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get(self._partition_field_end.eval(self.config)) # type: ignore # field_name is always casted to an interpolated string | ||
return options |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
# | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from dataclasses import InitVar, dataclass | ||
from typing import Any, Mapping, Optional, Union | ||
|
||
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider | ||
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState | ||
|
||
|
||
@dataclass | ||
class DefaultRequestOptionsProvider(RequestOptionsProvider): | ||
""" | ||
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the | ||
outbound request being made | ||
""" | ||
|
||
config: Config | ||
parameters: InitVar[Mapping[str, Any]] | ||
|
||
def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
pass | ||
|
||
def get_request_params( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return {} | ||
|
||
def get_request_headers( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return {} | ||
|
||
def get_request_body_data( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Union[Mapping[str, Any], str]: | ||
return {} | ||
|
||
def get_request_body_json( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,11 +6,13 @@ | |
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Protocol, Tuple | ||
|
||
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager | ||
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString | ||
from airbyte_cdk.sources.message import MessageRepository | ||
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY | ||
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition | ||
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record | ||
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import AbstractStreamStateConverter | ||
from airbyte_cdk.sources.types import Config | ||
|
||
|
||
def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any: | ||
|
@@ -58,6 +60,23 @@ def extract_value(self, record: Record) -> CursorValueType: | |
return cursor_value # type: ignore # we assume that the value the path points at is a comparable | ||
|
||
|
||
class InterpolatedCursorField(CursorField): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added this new I feel like this is a nice to have from the mental model of interpolation only being used at runtime, but the unfortunate side effect is that we leak interpolation as a concept into the Concurrent CDK. I've included the code to spark discussion, but my vote is that we remove this and just perform the interpolation ahead of time when we instantiate the |
||
def __init__(self, cursor_field_key: InterpolatedString, config: Config): | ||
self._cursor_field_key = cursor_field_key | ||
self.config = config | ||
|
||
@property | ||
def cursor_field_key(self) -> str: | ||
return self._cursor_field_key.eval(config=self.config) | ||
|
||
def extract_value(self, record: Record) -> CursorValueType: | ||
resolved_cursor_field_key = self._cursor_field_key.eval(config=self.config) | ||
cursor_value = record.data.get(resolved_cursor_field_key) | ||
if cursor_value is None: | ||
raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") | ||
return cursor_value # type: ignore # we assume that the value the path points at is a comparable | ||
|
||
|
||
class Cursor(ABC): | ||
@property | ||
@abstractmethod | ||
|
@@ -143,6 +162,7 @@ def __init__( | |
end_provider: Callable[[], CursorValueType], | ||
lookback_window: Optional[GapType] = None, | ||
slice_range: Optional[GapType] = None, | ||
cursor_granularity: Optional[GapType] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Our overall stance has been that some duplicates over the course of a sync are tolerable, but we should never lose records. After auditing our existing repository connectors, we have 14 cases where we use "P1D" or greater as granularity. This I would consider to be a potentially unacceptable number of duplicates and this is on some of our more highly used connectors hence why I've added this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for checking how often the feature is used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the change of supporting granularity here. For the topic of duplication and big granularities, isn't it driven by the API granularity? For example, if an API only takes format There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the largest sizes we do start to lose some options. Although I don't think the month or year option is that common or if I've ever seen it. Bad API design in my opinion. I think there's an interesting question of inference in that it's probably 99% chance a user using a date time of hours would use hour granularity. But that might be more of low-code interface as opposed to the underlying cursor. Which we would still need |
||
) -> None: | ||
self._stream_name = stream_name | ||
self._stream_namespace = stream_namespace | ||
|
@@ -159,6 +179,7 @@ def __init__( | |
self.start, self._concurrent_state = self._get_concurrent_state(stream_state) | ||
self._lookback_window = lookback_window | ||
self._slice_range = slice_range | ||
self._cursor_granularity = cursor_granularity | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: We might have (a very small piece of) logic that is duplicated. My understanding is that this value is basically defined as part of the implementation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The usage of
I'll adjust the code accordingly and add a test case for this! |
||
|
||
@property | ||
def state(self) -> MutableMapping[str, Any]: | ||
|
@@ -312,7 +333,10 @@ def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType) | |
current_lower_boundary = lower | ||
while not stop_processing: | ||
current_upper_boundary = min(current_lower_boundary + self._slice_range, upper) | ||
yield current_lower_boundary, current_upper_boundary | ||
if self._cursor_granularity: | ||
yield current_lower_boundary, current_upper_boundary - self._cursor_granularity | ||
else: | ||
yield current_lower_boundary, current_upper_boundary | ||
current_lower_boundary = current_upper_boundary | ||
if current_upper_boundary >= upper: | ||
stop_processing = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we add
DefaultRequestOptionsProvider
to the CDK's top level init.py?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep will add!