diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py index 2da7f6272f218..b37652383b586 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py @@ -56,7 +56,7 @@ def matches(self, response: requests.Response) -> Optional[ResponseAction]: return None def _response_matches_predicate(self, response: requests.Response) -> bool: - return self.predicate and self.predicate.eval(None, response=response.json()) + return self.predicate and self.predicate.eval(None, response=response.json(), headers=response.headers) def _response_contains_error_message(self, response: requests.Response) -> bool: if not self.error_message_contains: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py index 09d036580f8f2..0b28db18ff8a4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py @@ -39,10 +39,15 @@ def __post_init__(self, options: Mapping[str, Any]): def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: decoded_response = self.decoder.decode(response) + + # The default way that link is presented in requests.Response is a string of various links (last, next, etc). This + # is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links headers = response.headers + headers["link"] = response.links + if self.stop_condition: should_stop = self.stop_condition.eval(self.config, response=decoded_response, headers=headers, last_records=last_records) if should_stop: return None - token = self.cursor_value.eval(config=self.config, last_records=last_records, response=decoded_response) + token = self.cursor_value.eval(config=self.config, last_records=last_records, response=decoded_response, headers=headers) return token if token else None diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py index 091fc0293bf0b..eca5e4a71bd89 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py @@ -119,6 +119,15 @@ ResponseStatus.retry(10), None, ), + ( + "test_200_fail_with_predicate_from_header", + HTTPStatus.OK, + HttpResponseFilter(action=ResponseAction.FAIL, predicate="{{ headers['fail'] }}", options={}), + None, + {"fail": True}, + response_status.FAIL, + None, + ), ], ) def test_default_error_handler( diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py index 0299bd5873414..2f3600e03cd2d 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py @@ -20,8 +20,15 @@ ("test_token_from_response", "{{ response._metadata.content }}", None, "content_value"), ("test_token_from_options", "{{ options.key }}", None, "value"), ("test_token_not_found", "{{ response.invalid_key }}", None, None), - ("test_static_token_with_stop_condition_false", "token", InterpolatedBoolean(condition="{{False}}", options={}), "token"), - ("test_static_token_with_stop_condition_true", "token", InterpolatedBoolean(condition="{{True}}", options={}), None), + ("test_static_token_with_stop_condition_false", "token", InterpolatedBoolean("{{False}}", options={}), "token"), + ("test_static_token_with_stop_condition_true", "token", InterpolatedBoolean("{{True}}", options={}), None), + ("test_token_from_header", "{{ headers.next }}", InterpolatedBoolean("{{ not headers.has_more }}", options={}), "ready_to_go"), + ( + "test_token_from_response_header_links", + "{{ headers.link.next.url }}", + InterpolatedBoolean("{{ not headers.link.next.url }}", options={}), + "https://adventure.io/api/v1/records?page=2&per_page=100", + ), ], ) def test_cursor_pagination_strategy(test_name, template_string, stop_condition, expected_token): @@ -33,7 +40,8 @@ def test_cursor_pagination_strategy(test_name, template_string, stop_condition, ) response = requests.Response() - response.headers = {"has_more": True} + link_str = '; rel="next"' + response.headers = {"has_more": True, "next": "ready_to_go", "link": link_str} response_body = {"_metadata": {"content": "content_value"}, "accounts": [], "end": 99, "total": 200, "characters": {}} response._content = json.dumps(response_body).encode("utf-8") last_records = [{"id": 0, "more_records": True}, {"id": 1, "more_records": True}]