-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[airbyte-cdk] Decouple request_options_provider from datetime_based_cursor + concurrent_cursor features for low-code #45413
Changes from 2 commits
2cf2151
dfaef88
28fa872
52274e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from dataclasses import InitVar, dataclass | ||
from typing import Any, Mapping, MutableMapping, Optional, Union | ||
|
||
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString | ||
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType | ||
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider | ||
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState | ||
|
||
|
||
@dataclass | ||
class DatetimeBasedRequestOptionsProvider(RequestOptionsProvider): | ||
""" | ||
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the | ||
outbound request being made | ||
""" | ||
|
||
config: Config | ||
parameters: InitVar[Mapping[str, Any]] | ||
start_time_option: Optional[RequestOption] = None | ||
end_time_option: Optional[RequestOption] = None | ||
partition_field_start: Optional[str] = None | ||
partition_field_end: Optional[str] = None | ||
|
||
def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
self._partition_field_start = InterpolatedString.create(self.partition_field_start or "start_time", parameters=parameters) | ||
self._partition_field_end = InterpolatedString.create(self.partition_field_end or "end_time", parameters=parameters) | ||
|
||
def get_request_params( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._get_request_options(RequestOptionType.request_parameter, stream_slice) | ||
|
||
def get_request_headers( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._get_request_options(RequestOptionType.header, stream_slice) | ||
|
||
def get_request_body_data( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Union[Mapping[str, Any], str]: | ||
return self._get_request_options(RequestOptionType.body_data, stream_slice) | ||
|
||
def get_request_body_json( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._get_request_options(RequestOptionType.body_json, stream_slice) | ||
|
||
def _get_request_options(self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]) -> Mapping[str, Any]: | ||
options: MutableMapping[str, Any] = {} | ||
if not stream_slice: | ||
return options | ||
if self.start_time_option and self.start_time_option.inject_into == option_type: | ||
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore # field_name is always casted to an interpolated string | ||
self._partition_field_start.eval(self.config) | ||
) | ||
if self.end_time_option and self.end_time_option.inject_into == option_type: | ||
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get(self._partition_field_end.eval(self.config)) # type: ignore # field_name is always casted to an interpolated string | ||
return options |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from dataclasses import InitVar, dataclass | ||
from typing import Any, Mapping, Optional, Union | ||
|
||
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider | ||
from airbyte_cdk.sources.types import StreamSlice, StreamState | ||
|
||
|
||
@dataclass | ||
class DefaultRequestOptionsProvider(RequestOptionsProvider): | ||
""" | ||
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the | ||
outbound request being made | ||
""" | ||
|
||
parameters: InitVar[Mapping[str, Any]] | ||
|
||
def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
pass | ||
|
||
def get_request_params( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return {} | ||
|
||
def get_request_headers( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return {} | ||
|
||
def get_request_body_data( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Union[Mapping[str, Any], str]: | ||
return {} | ||
|
||
def get_request_body_json( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import json | ||
from dataclasses import InitVar, dataclass, field | ||
from functools import partial | ||
|
@@ -16,6 +17,7 @@ | |
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter | ||
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination | ||
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator | ||
from airbyte_cdk.sources.declarative.requesters.request_options import DefaultRequestOptionsProvider, RequestOptionsProvider | ||
from airbyte_cdk.sources.declarative.requesters.requester import Requester | ||
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever | ||
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer | ||
|
@@ -61,6 +63,7 @@ class SimpleRetriever(Retriever): | |
_primary_key: str = field(init=False, repr=False, default="") | ||
paginator: Optional[Paginator] = None | ||
stream_slicer: StreamSlicer = field(default_factory=lambda: SinglePartitionRouter(parameters={})) | ||
request_option_provider: RequestOptionsProvider = field(default_factory=lambda: DefaultRequestOptionsProvider(parameters={})) | ||
cursor: Optional[DeclarativeCursor] = None | ||
ignore_stream_slicer_parameters_on_paginated_requests: bool = False | ||
|
||
|
@@ -158,7 +161,7 @@ def _request_params( | |
stream_slice, | ||
next_page_token, | ||
self._paginator.get_request_params, | ||
self.stream_slicer.get_request_params, | ||
self.request_option_provider.get_request_params, | ||
) | ||
if isinstance(params, str): | ||
raise ValueError("Request params cannot be a string") | ||
|
@@ -184,7 +187,7 @@ def _request_body_data( | |
stream_slice, | ||
next_page_token, | ||
self._paginator.get_request_body_data, | ||
self.stream_slicer.get_request_body_data, | ||
self.request_option_provider.get_request_body_data, | ||
) | ||
|
||
def _request_body_json( | ||
|
@@ -203,7 +206,7 @@ def _request_body_json( | |
stream_slice, | ||
next_page_token, | ||
self._paginator.get_request_body_json, | ||
self.stream_slicer.get_request_body_json, | ||
self.request_option_provider.get_request_body_json, | ||
) | ||
if isinstance(body_json, str): | ||
raise ValueError("Request body json cannot be a string") | ||
|
@@ -231,21 +234,21 @@ def _parse_response( | |
) -> Iterable[Record]: | ||
if not response: | ||
self._last_response = None | ||
return [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch! |
||
|
||
self._last_response = response | ||
record_generator = self.record_selector.select_records( | ||
response=response, | ||
stream_state=stream_state, | ||
records_schema=records_schema, | ||
stream_slice=stream_slice, | ||
next_page_token=next_page_token, | ||
) | ||
self._last_page_size = 0 | ||
for record in record_generator: | ||
self._last_page_size += 1 | ||
self._last_record = record | ||
yield record | ||
yield from [] | ||
else: | ||
self._last_response = response | ||
record_generator = self.record_selector.select_records( | ||
response=response, | ||
stream_state=stream_state, | ||
records_schema=records_schema, | ||
stream_slice=stream_slice, | ||
next_page_token=next_page_token, | ||
) | ||
self._last_page_size = 0 | ||
for record in record_generator: | ||
self._last_page_size += 1 | ||
self._last_record = record | ||
yield record | ||
|
||
@property # type: ignore | ||
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import functools | ||
from abc import ABC, abstractmethod | ||
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Protocol, Tuple | ||
|
@@ -143,6 +144,7 @@ def __init__( | |
end_provider: Callable[[], CursorValueType], | ||
lookback_window: Optional[GapType] = None, | ||
slice_range: Optional[GapType] = None, | ||
cursor_granularity: Optional[GapType] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Our overall stance has been that some duplicates over the course of a sync are tolerable, but we should never lose records. After auditing our existing repository connectors, we have 14 cases where we use "P1D" or greater as granularity. This I would consider to be a potentially unacceptable number of duplicates and this is on some of our more highly used connectors hence why I've added this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for checking how often the feature is used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the change of supporting granularity here. For the topic of duplication and big granularities, isn't it driven by the API granularity? For example, if an API only takes format There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the largest sizes we do start to lose some options. Although I don't think the month or year option is that common or if I've ever seen it. Bad API design in my opinion. I think there's an interesting question of inference in that it's probably 99% chance a user using a date time of hours would use hour granularity. But that might be more of low-code interface as opposed to the underlying cursor. Which we would still need |
||
) -> None: | ||
self._stream_name = stream_name | ||
self._stream_namespace = stream_namespace | ||
|
@@ -159,6 +161,7 @@ def __init__( | |
self.start, self._concurrent_state = self._get_concurrent_state(stream_state) | ||
self._lookback_window = lookback_window | ||
self._slice_range = slice_range | ||
self._cursor_granularity = cursor_granularity | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: We might have (a very small piece of) logic that is duplicated. My understanding is that this value is basically defined as part of the implementation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The usage of
I'll adjust the code accordingly and add a test case for this! |
||
|
||
@property | ||
def state(self) -> MutableMapping[str, Any]: | ||
|
@@ -312,7 +315,10 @@ def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType) | |
current_lower_boundary = lower | ||
while not stop_processing: | ||
current_upper_boundary = min(current_lower_boundary + self._slice_range, upper) | ||
yield current_lower_boundary, current_upper_boundary | ||
if self._cursor_granularity: | ||
yield current_lower_boundary, current_upper_boundary - self._cursor_granularity | ||
else: | ||
yield current_lower_boundary, current_upper_boundary | ||
current_lower_boundary = current_upper_boundary | ||
if current_upper_boundary >= upper: | ||
stop_processing = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we add
DefaultRequestOptionsProvider
to the CDK's top level init.py?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep will add!