diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index c2ceb343d6f9d..9dc9248af76c6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1988,6 +1988,10 @@ definitions: anyOf: - "$ref": "#/definitions/DefaultPaginator" - "$ref": "#/definitions/NoPagination" + ignore_stream_slicer_parameters_on_paginated_requests: + description: If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored. + type: boolean + default: false partition_router: title: Partition Router description: PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index c53385bf36afa..ee9ead497d0dc 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1328,6 +1328,10 @@ class SimpleRetriever(BaseModel): None, description="Paginator component that describes how to navigate through the API's pages.", ) + ignore_stream_slicer_parameters_on_paginated_requests: Optional[bool] = Field( + False, + description='If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.', + ) partition_router: Optional[ Union[ CustomPartitionRouter, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 03e43e4559476..4703c62a07829 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -958,12 +958,17 @@ def create_simple_retriever( cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None paginator = ( self._create_component_from_model( - model=model.paginator, config=config, url_base=url_base, cursor_used_for_stop_condition=cursor_used_for_stop_condition + model=model.paginator, + config=config, + url_base=url_base, + cursor_used_for_stop_condition=cursor_used_for_stop_condition, ) if model.paginator else NoPagination(parameters={}) ) + ignore_stream_slicer_parameters_on_paginated_requests = model.ignore_stream_slicer_parameters_on_paginated_requests or False + if self._limit_slices_fetched or self._emit_connector_builder_messages: return SimpleRetrieverTestReadDecorator( name=name, @@ -975,6 +980,7 @@ def create_simple_retriever( cursor=cursor, config=config, maximum_number_of_slices=self._limit_slices_fetched or 5, + ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, parameters=model.parameters or {}, ) return SimpleRetriever( @@ -986,6 +992,7 @@ def create_simple_retriever( stream_slicer=stream_slicer, cursor=cursor, config=config, + ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, parameters=model.parameters or {}, ) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 549a694fd059f..a9c9460449226 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -59,6 +59,7 @@ class SimpleRetriever(Retriever): paginator: Optional[Paginator] = None stream_slicer: StreamSlicer = SinglePartitionRouter(parameters={}) cursor: Optional[Cursor] = None + ignore_stream_slicer_parameters_on_paginated_requests: bool = False def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._paginator = self.paginator or NoPagination(parameters=parameters) @@ -105,12 +106,12 @@ def _get_request_options( Returned merged mapping otherwise """ # FIXME we should eventually remove the usage of stream_state as part of the interpolation - return combine_mappings( - [ - paginator_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), - stream_slicer_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), - ] - ) + mappings = [ + paginator_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + ] + if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests: + mappings.append(stream_slicer_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)) + return combine_mappings(mappings) def _request_headers( self, diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 1747c8ec4d072..438a1497df84e 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -273,6 +273,46 @@ def test_get_request_headers(test_name, paginator_mapping, expected_mapping): pass +@pytest.mark.parametrize( + "test_name, paginator_mapping, ignore_stream_slicer_parameters_on_paginated_requests, next_page_token, expected_mapping", + [ + ("test_do_not_ignore_stream_slicer_params_if_ignore_is_true_but_no_next_page_token", {"key_from_pagination": "1000"}, True, None, {"key_from_pagination": "1000"}), + ("test_do_not_ignore_stream_slicer_params_if_ignore_is_false_and_no_next_page_token", {"key_from_pagination": "1000"}, False, None, {"key_from_pagination": "1000", "key_from_slicer": "value"}), + ("test_ignore_stream_slicer_params_on_paginated_request", {"key_from_pagination": "1000"}, True, {"page": 2}, {"key_from_pagination": "1000"}), + ("test_do_not_ignore_stream_slicer_params_on_paginated_request", {"key_from_pagination": "1000"}, False, {"page": 2}, {"key_from_pagination": "1000", "key_from_slicer": "value"}), + ], +) +def test_ignore_stream_slicer_parameters_on_paginated_requests(test_name, paginator_mapping, ignore_stream_slicer_parameters_on_paginated_requests, next_page_token, expected_mapping): + # This test is separate from the other request options because request headers must be strings + paginator = MagicMock() + paginator.get_request_headers.return_value = paginator_mapping + requester = MagicMock(use_cache=False) + + stream_slicer = MagicMock() + stream_slicer.get_request_headers.return_value = {"key_from_slicer": "value"} + + record_selector = MagicMock() + retriever = SimpleRetriever( + name="stream_name", + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + stream_slicer=stream_slicer, + paginator=paginator, + ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, + parameters={}, + config={}, + ) + + request_option_type_to_method = { + RequestOptionType.header: retriever._request_headers, + } + + for _, method in request_option_type_to_method.items(): + actual_mapping = method(None, None, next_page_token={"next_page_token": "1000"}) + assert expected_mapping == actual_mapping + + @pytest.mark.parametrize( "test_name, slicer_body_data, paginator_body_data, expected_body_data", [