Skip to content

Commit 199a807

Browse files
authored
[airbyte-cdk] Decouple request_options_provider from datetime_based_cursor + concurrent_cursor features for low-code (#45413)
1 parent 32b8648 commit 199a807

File tree

12 files changed

+1030
-45
lines changed

12 files changed

+1030
-45
lines changed

airbyte-cdk/python/airbyte_cdk/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
from .sources.declarative.requesters.request_option import RequestOption, RequestOptionType
4949

50+
from .sources.declarative.requesters.request_options.default_request_options_provider import DefaultRequestOptionsProvider
5051
from .sources.declarative.requesters.request_options.interpolated_request_input_provider import InterpolatedRequestInputProvider
5152
from .sources.declarative.requesters.requester import HttpMethod
5253
from .sources.declarative.retrievers import SimpleRetriever
@@ -133,6 +134,7 @@
133134
"DeclarativeStream",
134135
"Decoder",
135136
"DefaultPaginator",
137+
"DefaultRequestOptionsProvider",
136138
"DpathExtractor",
137139
"FieldPointer",
138140
"HttpMethod",

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+55-2
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,12 @@
140140
StopConditionPaginationStrategyDecorator,
141141
)
142142
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
143-
from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider
143+
from airbyte_cdk.sources.declarative.requesters.request_options import (
144+
DatetimeBasedRequestOptionsProvider,
145+
DefaultRequestOptionsProvider,
146+
InterpolatedRequestOptionsProvider,
147+
RequestOptionsProvider,
148+
)
144149
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
145150
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
146151
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever, SimpleRetrieverTestReadDecorator
@@ -653,6 +658,40 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi
653658
"per_partition_cursor": combined_slicers if isinstance(combined_slicers, PerPartitionCursor) else None,
654659
"is_global_substream_cursor": isinstance(combined_slicers, GlobalSubstreamCursor),
655660
}
661+
662+
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
663+
cursor_model = model.incremental_sync
664+
665+
end_time_option = (
666+
RequestOption(
667+
inject_into=RequestOptionType(cursor_model.end_time_option.inject_into.value),
668+
field_name=cursor_model.end_time_option.field_name,
669+
parameters=cursor_model.parameters or {},
670+
)
671+
if cursor_model.end_time_option
672+
else None
673+
)
674+
start_time_option = (
675+
RequestOption(
676+
inject_into=RequestOptionType(cursor_model.start_time_option.inject_into.value),
677+
field_name=cursor_model.start_time_option.field_name,
678+
parameters=cursor_model.parameters or {},
679+
)
680+
if cursor_model.start_time_option
681+
else None
682+
)
683+
684+
request_options_provider = DatetimeBasedRequestOptionsProvider(
685+
start_time_option=start_time_option,
686+
end_time_option=end_time_option,
687+
partition_field_start=cursor_model.partition_field_end,
688+
partition_field_end=cursor_model.partition_field_end,
689+
config=config,
690+
parameters=model.parameters or {},
691+
)
692+
else:
693+
request_options_provider = None
694+
656695
transformations = []
657696
if model.transformations:
658697
for transformation_model in model.transformations:
@@ -663,6 +702,7 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi
663702
name=model.name,
664703
primary_key=primary_key,
665704
stream_slicer=combined_slicers,
705+
request_options_provider=request_options_provider,
666706
stop_condition_on_cursor=stop_condition_on_cursor,
667707
client_side_incremental_sync=client_side_incremental_sync,
668708
transformations=transformations,
@@ -1126,6 +1166,7 @@ def create_simple_retriever(
11261166
name: str,
11271167
primary_key: Optional[Union[str, List[str], List[List[str]]]],
11281168
stream_slicer: Optional[StreamSlicer],
1169+
request_options_provider: Optional[RequestOptionsProvider] = None,
11291170
stop_condition_on_cursor: bool = False,
11301171
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
11311172
transformations: List[RecordTransformation],
@@ -1140,11 +1181,21 @@ def create_simple_retriever(
11401181
client_side_incremental_sync=client_side_incremental_sync,
11411182
)
11421183
url_base = model.requester.url_base if hasattr(model.requester, "url_base") else requester.get_url_base()
1143-
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
11441184

11451185
# Define cursor only if per partition or common incremental support is needed
11461186
cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None
11471187

1188+
if not isinstance(stream_slicer, DatetimeBasedCursor) or type(stream_slicer) is not DatetimeBasedCursor:
1189+
# Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods).
1190+
# Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement
1191+
# their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's
1192+
# request_options_provider
1193+
request_options_provider = stream_slicer or DefaultRequestOptionsProvider(parameters={})
1194+
elif not request_options_provider:
1195+
request_options_provider = DefaultRequestOptionsProvider(parameters={})
1196+
1197+
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
1198+
11481199
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
11491200
paginator = (
11501201
self._create_component_from_model(
@@ -1168,6 +1219,7 @@ def create_simple_retriever(
11681219
requester=requester,
11691220
record_selector=record_selector,
11701221
stream_slicer=stream_slicer,
1222+
request_option_provider=request_options_provider,
11711223
cursor=cursor,
11721224
config=config,
11731225
maximum_number_of_slices=self._limit_slices_fetched or 5,
@@ -1181,6 +1233,7 @@ def create_simple_retriever(
11811233
requester=requester,
11821234
record_selector=record_selector,
11831235
stream_slicer=stream_slicer,
1236+
request_option_provider=request_options_provider,
11841237
cursor=cursor,
11851238
config=config,
11861239
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/__init__.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5+
from airbyte_cdk.sources.declarative.requesters.request_options.datetime_based_request_options_provider import (
6+
DatetimeBasedRequestOptionsProvider,
7+
)
8+
from airbyte_cdk.sources.declarative.requesters.request_options.default_request_options_provider import DefaultRequestOptionsProvider
59
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
610
InterpolatedRequestOptionsProvider,
711
)
812
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
913

10-
__all__ = ["InterpolatedRequestOptionsProvider", "RequestOptionsProvider"]
14+
__all__ = ["DatetimeBasedRequestOptionsProvider", "DefaultRequestOptionsProvider", "InterpolatedRequestOptionsProvider", "RequestOptionsProvider"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import InitVar, dataclass
6+
from typing import Any, Mapping, MutableMapping, Optional, Union
7+
8+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
9+
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
10+
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
11+
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
12+
13+
14+
@dataclass
15+
class DatetimeBasedRequestOptionsProvider(RequestOptionsProvider):
16+
"""
17+
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the
18+
outbound request being made
19+
"""
20+
21+
config: Config
22+
parameters: InitVar[Mapping[str, Any]]
23+
start_time_option: Optional[RequestOption] = None
24+
end_time_option: Optional[RequestOption] = None
25+
partition_field_start: Optional[str] = None
26+
partition_field_end: Optional[str] = None
27+
28+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
29+
self._partition_field_start = InterpolatedString.create(self.partition_field_start or "start_time", parameters=parameters)
30+
self._partition_field_end = InterpolatedString.create(self.partition_field_end or "end_time", parameters=parameters)
31+
32+
def get_request_params(
33+
self,
34+
*,
35+
stream_state: Optional[StreamState] = None,
36+
stream_slice: Optional[StreamSlice] = None,
37+
next_page_token: Optional[Mapping[str, Any]] = None,
38+
) -> Mapping[str, Any]:
39+
return self._get_request_options(RequestOptionType.request_parameter, stream_slice)
40+
41+
def get_request_headers(
42+
self,
43+
*,
44+
stream_state: Optional[StreamState] = None,
45+
stream_slice: Optional[StreamSlice] = None,
46+
next_page_token: Optional[Mapping[str, Any]] = None,
47+
) -> Mapping[str, Any]:
48+
return self._get_request_options(RequestOptionType.header, stream_slice)
49+
50+
def get_request_body_data(
51+
self,
52+
*,
53+
stream_state: Optional[StreamState] = None,
54+
stream_slice: Optional[StreamSlice] = None,
55+
next_page_token: Optional[Mapping[str, Any]] = None,
56+
) -> Union[Mapping[str, Any], str]:
57+
return self._get_request_options(RequestOptionType.body_data, stream_slice)
58+
59+
def get_request_body_json(
60+
self,
61+
*,
62+
stream_state: Optional[StreamState] = None,
63+
stream_slice: Optional[StreamSlice] = None,
64+
next_page_token: Optional[Mapping[str, Any]] = None,
65+
) -> Mapping[str, Any]:
66+
return self._get_request_options(RequestOptionType.body_json, stream_slice)
67+
68+
def _get_request_options(self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]) -> Mapping[str, Any]:
69+
options: MutableMapping[str, Any] = {}
70+
if not stream_slice:
71+
return options
72+
if self.start_time_option and self.start_time_option.inject_into == option_type:
73+
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore # field_name is always casted to an interpolated string
74+
self._partition_field_start.eval(self.config)
75+
)
76+
if self.end_time_option and self.end_time_option.inject_into == option_type:
77+
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get(self._partition_field_end.eval(self.config)) # type: ignore # field_name is always casted to an interpolated string
78+
return options
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import InitVar, dataclass
6+
from typing import Any, Mapping, Optional, Union
7+
8+
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
9+
from airbyte_cdk.sources.types import StreamSlice, StreamState
10+
11+
12+
@dataclass
13+
class DefaultRequestOptionsProvider(RequestOptionsProvider):
14+
"""
15+
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the
16+
outbound request being made
17+
"""
18+
19+
parameters: InitVar[Mapping[str, Any]]
20+
21+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
22+
pass
23+
24+
def get_request_params(
25+
self,
26+
*,
27+
stream_state: Optional[StreamState] = None,
28+
stream_slice: Optional[StreamSlice] = None,
29+
next_page_token: Optional[Mapping[str, Any]] = None,
30+
) -> Mapping[str, Any]:
31+
return {}
32+
33+
def get_request_headers(
34+
self,
35+
*,
36+
stream_state: Optional[StreamState] = None,
37+
stream_slice: Optional[StreamSlice] = None,
38+
next_page_token: Optional[Mapping[str, Any]] = None,
39+
) -> Mapping[str, Any]:
40+
return {}
41+
42+
def get_request_body_data(
43+
self,
44+
*,
45+
stream_state: Optional[StreamState] = None,
46+
stream_slice: Optional[StreamSlice] = None,
47+
next_page_token: Optional[Mapping[str, Any]] = None,
48+
) -> Union[Mapping[str, Any], str]:
49+
return {}
50+
51+
def get_request_body_json(
52+
self,
53+
*,
54+
stream_state: Optional[StreamState] = None,
55+
stream_slice: Optional[StreamSlice] = None,
56+
next_page_token: Optional[Mapping[str, Any]] = None,
57+
) -> Mapping[str, Any]:
58+
return {}

airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

+21-18
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4+
45
import json
56
from dataclasses import InitVar, dataclass, field
67
from functools import partial
@@ -16,6 +17,7 @@
1617
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter
1718
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
1819
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
20+
from airbyte_cdk.sources.declarative.requesters.request_options import DefaultRequestOptionsProvider, RequestOptionsProvider
1921
from airbyte_cdk.sources.declarative.requesters.requester import Requester
2022
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
2123
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
@@ -61,6 +63,7 @@ class SimpleRetriever(Retriever):
6163
_primary_key: str = field(init=False, repr=False, default="")
6264
paginator: Optional[Paginator] = None
6365
stream_slicer: StreamSlicer = field(default_factory=lambda: SinglePartitionRouter(parameters={}))
66+
request_option_provider: RequestOptionsProvider = field(default_factory=lambda: DefaultRequestOptionsProvider(parameters={}))
6467
cursor: Optional[DeclarativeCursor] = None
6568
ignore_stream_slicer_parameters_on_paginated_requests: bool = False
6669

@@ -158,7 +161,7 @@ def _request_params(
158161
stream_slice,
159162
next_page_token,
160163
self._paginator.get_request_params,
161-
self.stream_slicer.get_request_params,
164+
self.request_option_provider.get_request_params,
162165
)
163166
if isinstance(params, str):
164167
raise ValueError("Request params cannot be a string")
@@ -184,7 +187,7 @@ def _request_body_data(
184187
stream_slice,
185188
next_page_token,
186189
self._paginator.get_request_body_data,
187-
self.stream_slicer.get_request_body_data,
190+
self.request_option_provider.get_request_body_data,
188191
)
189192

190193
def _request_body_json(
@@ -203,7 +206,7 @@ def _request_body_json(
203206
stream_slice,
204207
next_page_token,
205208
self._paginator.get_request_body_json,
206-
self.stream_slicer.get_request_body_json,
209+
self.request_option_provider.get_request_body_json,
207210
)
208211
if isinstance(body_json, str):
209212
raise ValueError("Request body json cannot be a string")
@@ -231,21 +234,21 @@ def _parse_response(
231234
) -> Iterable[Record]:
232235
if not response:
233236
self._last_response = None
234-
return []
235-
236-
self._last_response = response
237-
record_generator = self.record_selector.select_records(
238-
response=response,
239-
stream_state=stream_state,
240-
records_schema=records_schema,
241-
stream_slice=stream_slice,
242-
next_page_token=next_page_token,
243-
)
244-
self._last_page_size = 0
245-
for record in record_generator:
246-
self._last_page_size += 1
247-
self._last_record = record
248-
yield record
237+
yield from []
238+
else:
239+
self._last_response = response
240+
record_generator = self.record_selector.select_records(
241+
response=response,
242+
stream_state=stream_state,
243+
records_schema=records_schema,
244+
stream_slice=stream_slice,
245+
next_page_token=next_page_token,
246+
)
247+
self._last_page_size = 0
248+
for record in record_generator:
249+
self._last_page_size += 1
250+
self._last_record = record
251+
yield record
249252

250253
@property # type: ignore
251254
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:

0 commit comments

Comments
 (0)