Skip to content

Commit 5724ca0

Browse files
authored
Add ignore_stream_slicer_parameters_on_paginated_requests flag (#35462)
1 parent c9b7d8a commit 5724ca0

File tree

5 files changed

+63
-7
lines changed

5 files changed

+63
-7
lines changed

airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2001,6 +2001,10 @@ definitions:
20012001
anyOf:
20022002
- "$ref": "#/definitions/DefaultPaginator"
20032003
- "$ref": "#/definitions/NoPagination"
2004+
ignore_stream_slicer_parameters_on_paginated_requests:
2005+
description: If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.
2006+
type: boolean
2007+
default: false
20042008
partition_router:
20052009
title: Partition Router
20062010
description: PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.

airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1338,6 +1338,10 @@ class SimpleRetriever(BaseModel):
13381338
None,
13391339
description="Paginator component that describes how to navigate through the API's pages.",
13401340
)
1341+
ignore_stream_slicer_parameters_on_paginated_requests: Optional[bool] = Field(
1342+
False,
1343+
description='If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.',
1344+
)
13411345
partition_router: Optional[
13421346
Union[
13431347
CustomPartitionRouter,

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -958,12 +958,17 @@ def create_simple_retriever(
958958
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
959959
paginator = (
960960
self._create_component_from_model(
961-
model=model.paginator, config=config, url_base=url_base, cursor_used_for_stop_condition=cursor_used_for_stop_condition
961+
model=model.paginator,
962+
config=config,
963+
url_base=url_base,
964+
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
962965
)
963966
if model.paginator
964967
else NoPagination(parameters={})
965968
)
966969

970+
ignore_stream_slicer_parameters_on_paginated_requests = model.ignore_stream_slicer_parameters_on_paginated_requests or False
971+
967972
if self._limit_slices_fetched or self._emit_connector_builder_messages:
968973
return SimpleRetrieverTestReadDecorator(
969974
name=name,
@@ -975,6 +980,7 @@ def create_simple_retriever(
975980
cursor=cursor,
976981
config=config,
977982
maximum_number_of_slices=self._limit_slices_fetched or 5,
983+
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
978984
parameters=model.parameters or {},
979985
)
980986
return SimpleRetriever(
@@ -986,6 +992,7 @@ def create_simple_retriever(
986992
stream_slicer=stream_slicer,
987993
cursor=cursor,
988994
config=config,
995+
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
989996
parameters=model.parameters or {},
990997
)
991998

airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class SimpleRetriever(Retriever):
5959
paginator: Optional[Paginator] = None
6060
stream_slicer: StreamSlicer = SinglePartitionRouter(parameters={})
6161
cursor: Optional[Cursor] = None
62+
ignore_stream_slicer_parameters_on_paginated_requests: bool = False
6263

6364
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
6465
self._paginator = self.paginator or NoPagination(parameters=parameters)
@@ -105,12 +106,12 @@ def _get_request_options(
105106
Returned merged mapping otherwise
106107
"""
107108
# FIXME we should eventually remove the usage of stream_state as part of the interpolation
108-
return combine_mappings(
109-
[
110-
paginator_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
111-
stream_slicer_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
112-
]
113-
)
109+
mappings = [
110+
paginator_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
111+
]
112+
if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests:
113+
mappings.append(stream_slicer_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token))
114+
return combine_mappings(mappings)
114115

115116
def _request_headers(
116117
self,

airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,46 @@ def test_get_request_headers(test_name, paginator_mapping, expected_mapping):
273273
pass
274274

275275

276+
@pytest.mark.parametrize(
277+
"test_name, paginator_mapping, ignore_stream_slicer_parameters_on_paginated_requests, next_page_token, expected_mapping",
278+
[
279+
("test_do_not_ignore_stream_slicer_params_if_ignore_is_true_but_no_next_page_token", {"key_from_pagination": "1000"}, True, None, {"key_from_pagination": "1000"}),
280+
("test_do_not_ignore_stream_slicer_params_if_ignore_is_false_and_no_next_page_token", {"key_from_pagination": "1000"}, False, None, {"key_from_pagination": "1000", "key_from_slicer": "value"}),
281+
("test_ignore_stream_slicer_params_on_paginated_request", {"key_from_pagination": "1000"}, True, {"page": 2}, {"key_from_pagination": "1000"}),
282+
("test_do_not_ignore_stream_slicer_params_on_paginated_request", {"key_from_pagination": "1000"}, False, {"page": 2}, {"key_from_pagination": "1000", "key_from_slicer": "value"}),
283+
],
284+
)
285+
def test_ignore_stream_slicer_parameters_on_paginated_requests(test_name, paginator_mapping, ignore_stream_slicer_parameters_on_paginated_requests, next_page_token, expected_mapping):
286+
# This test is separate from the other request options because request headers must be strings
287+
paginator = MagicMock()
288+
paginator.get_request_headers.return_value = paginator_mapping
289+
requester = MagicMock(use_cache=False)
290+
291+
stream_slicer = MagicMock()
292+
stream_slicer.get_request_headers.return_value = {"key_from_slicer": "value"}
293+
294+
record_selector = MagicMock()
295+
retriever = SimpleRetriever(
296+
name="stream_name",
297+
primary_key=primary_key,
298+
requester=requester,
299+
record_selector=record_selector,
300+
stream_slicer=stream_slicer,
301+
paginator=paginator,
302+
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
303+
parameters={},
304+
config={},
305+
)
306+
307+
request_option_type_to_method = {
308+
RequestOptionType.header: retriever._request_headers,
309+
}
310+
311+
for _, method in request_option_type_to_method.items():
312+
actual_mapping = method(None, None, next_page_token={"next_page_token": "1000"})
313+
assert expected_mapping == actual_mapping
314+
315+
276316
@pytest.mark.parametrize(
277317
"test_name, slicer_body_data, paginator_body_data, expected_body_data",
278318
[

0 commit comments

Comments
 (0)