diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py index 676074d174b3e..b499ea756132f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py @@ -229,7 +229,7 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class, in options = kwargs.get(OPTIONS_STR, {}) try: # enums can't accept options - if issubclass(expected_type, enum.Enum): + if issubclass(expected_type, enum.Enum) or self.is_primitive(definition): return expected_type(definition) else: return expected_type(definition, options=options) @@ -237,6 +237,9 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class, in raise Exception(f"failed to instantiate type {expected_type}. {e}") return definition + def is_primitive(self, obj): + return isinstance(obj, (int, float, bool)) + @staticmethod def is_object_definition_with_class_name(definition): return isinstance(definition, dict) and "class_name" in definition diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py index 3a7df2dc7b365..7f7ba913b1089 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py @@ -2,11 +2,13 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from dataclasses import dataclass -from typing import Optional +from dataclasses import InitVar, dataclass +from typing import Any, Mapping, Optional, Union import requests +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy +from airbyte_cdk.sources.declarative.types import Config from dataclasses_jsonschema import JsonSchemaMixin @@ -19,7 +21,14 @@ class ConstantBackoffStrategy(BackoffStrategy, JsonSchemaMixin): backoff_time_in_seconds (float): time to backoff before retrying a retryable request. """ - backoff_time_in_seconds: float + backoff_time_in_seconds: Union[float, InterpolatedString, str] + options: InitVar[Mapping[str, Any]] + config: Config + + def __post_init__(self, options: Mapping[str, Any]): + if not isinstance(self.backoff_time_in_seconds, InterpolatedString): + self.backoff_time_in_seconds = str(self.backoff_time_in_seconds) + self.backoff_time_in_seconds = InterpolatedString.create(self.backoff_time_in_seconds, options=options) def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: - return self.backoff_time_in_seconds + return self.backoff_time_in_seconds.eval(self.config) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py index 75a52ffca3756..aec9c98492f92 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py @@ -2,11 +2,13 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from dataclasses import dataclass -from typing import Optional +from dataclasses import InitVar, dataclass +from typing import Any, Mapping, Optional, Union import requests +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy +from airbyte_cdk.sources.declarative.types import Config from dataclasses_jsonschema import JsonSchemaMixin @@ -19,7 +21,14 @@ class ExponentialBackoffStrategy(BackoffStrategy, JsonSchemaMixin): factor (float): multiplicative factor """ - factor: float = 5 + options: InitVar[Mapping[str, Any]] + config: Config + factor: Union[float, InterpolatedString, str] = 5 + + def __post_init__(self, options: Mapping[str, Any]): + if not isinstance(self.factor, InterpolatedString): + self.factor = str(self.factor) + self.factor = InterpolatedString.create(self.factor, options=options) def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: - return self.factor * 2**attempt_count + return self.factor.eval(self.config) * 2**attempt_count diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py index 3ff279c78eb5c..1c6e65b872482 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py @@ -3,12 +3,14 @@ # import re -from dataclasses import dataclass -from typing import Optional +from dataclasses import InitVar, dataclass +from typing import Any, Mapping, Optional, Union import requests +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy +from airbyte_cdk.sources.declarative.types import Config from dataclasses_jsonschema import JsonSchemaMixin @@ -22,12 +24,16 @@ class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy, JsonSchemaMixin): regex (Optional[str]): optional regex to apply on the header to extract its value """ - header: str + header: Union[InterpolatedString, str] + options: InitVar[Mapping[str, Any]] + config: Config regex: Optional[str] = None - def __post_init__(self): + def __post_init__(self, options: Mapping[str, Any]): self.regex = re.compile(self.regex) if self.regex else None + self.header = InterpolatedString.create(self.header, options=options) def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: - header_value = get_numeric_value_from_header(response, self.header, self.regex) + header = self.header.eval(config=self.config) + header_value = get_numeric_value_from_header(response, header, self.regex) return header_value diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_until_time_from_header_backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_until_time_from_header_backoff_strategy.py index 0e56741035ba6..250ca85c74114 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_until_time_from_header_backoff_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_until_time_from_header_backoff_strategy.py @@ -5,12 +5,14 @@ import numbers import re import time -from dataclasses import dataclass -from typing import Optional +from dataclasses import InitVar, dataclass +from typing import Any, Mapping, Optional, Union import requests +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy +from airbyte_cdk.sources.declarative.types import Config from dataclasses_jsonschema import JsonSchemaMixin @@ -26,24 +28,36 @@ class WaitUntilTimeFromHeaderBackoffStrategy(BackoffStrategy, JsonSchemaMixin): regex (Optional[str]): optional regex to apply on the header to extract its value """ - header: str - min_wait: Optional[float] = None - regex: Optional[str] = None + header: Union[InterpolatedString, str] + options: InitVar[Mapping[str, Any]] + config: Config + min_wait: Optional[Union[float, InterpolatedString, str]] = None + regex: Optional[Union[InterpolatedString, str]] = None - def __post_init__(self): - self.regex = re.compile(self.regex) if self.regex else None + def __post_init__(self, options: Mapping[str, Any]): + self.header = InterpolatedString.create(self.header, options=options) + self.regex = InterpolatedString.create(self.regex, options=options) if self.regex else None + if not isinstance(self.min_wait, InterpolatedString): + self.min_wait = InterpolatedString.create(str(self.min_wait), options=options) def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: now = time.time() - wait_until = get_numeric_value_from_header(response, self.header, self.regex) + header = self.header.eval(self.config) + if self.regex: + evaled_regex = self.regex.eval(self.config) + regex = re.compile(evaled_regex) + else: + regex = None + wait_until = get_numeric_value_from_header(response, header, regex) + min_wait = self.min_wait.eval(self.config) if wait_until is None or not wait_until: - return self.min_wait + return min_wait if (isinstance(wait_until, str) and wait_until.isnumeric()) or isinstance(wait_until, numbers.Number): wait_time = float(wait_until) - now else: return self.min_wait - if self.min_wait: - return max(wait_time, self.min_wait) + if min_wait: + return max(wait_time, min_wait) elif wait_time < 0: return None return wait_time diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py index 9757ec2da21a3..9fe56017f9951 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py @@ -94,6 +94,7 @@ class DefaultErrorHandler(ErrorHandler, JsonSchemaMixin): config: Config options: InitVar[Mapping[str, Any]] + config: Config response_filters: Optional[List[HttpResponseFilter]] = None max_retries: Optional[int] = 5 _max_retries: int = field(init=False, repr=False, default=5) @@ -111,7 +112,7 @@ def __post_init__(self, options: Mapping[str, Any]): self.response_filters.append(HttpResponseFilter(ResponseAction.IGNORE, config={}, options={})) if not self.backoff_strategies: - self.backoff_strategies = [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY()] + self.backoff_strategies = [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(options=options, config=self.config)] self._last_request_to_attempt_count: MutableMapping[requests.PreparedRequest, int] = {} diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py index 8ee5af1e57ff6..38b9468909298 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py @@ -8,17 +8,27 @@ from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.constant_backoff_strategy import ConstantBackoffStrategy BACKOFF_TIME = 10 +OPTIONS_BACKOFF_TIME = 20 +CONFIG_BACKOFF_TIME = 30 @pytest.mark.parametrize( - "test_name, attempt_count, expected_backoff_time", + "test_name, attempt_count, backofftime, expected_backoff_time", [ - ("test_exponential_backoff", 1, BACKOFF_TIME), - ("test_exponential_backoff", 2, BACKOFF_TIME), + ("test_constant_backoff_first_attempt", 1, BACKOFF_TIME, BACKOFF_TIME), + ("test_constant_backoff_first_attempt_float", 1, 6.7, 6.7), + ("test_constant_backoff_attempt_round_float", 1.0, 6.7, 6.7), + ("test_constant_backoff_attempt_round_float", 1.5, 6.7, 6.7), + ("test_constant_backoff_first_attempt_round_float", 1, 10.0, BACKOFF_TIME), + ("test_constant_backoff_second_attempt_round_float", 2, 10.0, BACKOFF_TIME), + ("test_constant_backoff_from_options", 1, "{{ options['backoff'] }}", OPTIONS_BACKOFF_TIME), + ("test_constant_backoff_from_config", 1, "{{ config['backoff'] }}", CONFIG_BACKOFF_TIME), ], ) -def test_exponential_backoff(test_name, attempt_count, expected_backoff_time): +def test_constant_backoff(test_name, attempt_count, backofftime, expected_backoff_time): response_mock = MagicMock() - backoff_strategy = ConstantBackoffStrategy(backoff_time_in_seconds=BACKOFF_TIME) + backoff_strategy = ConstantBackoffStrategy( + options={"backoff": OPTIONS_BACKOFF_TIME}, backoff_time_in_seconds=backofftime, config={"backoff": CONFIG_BACKOFF_TIME} + ) backoff = backoff_strategy.backoff(response_mock, attempt_count) assert backoff == expected_backoff_time diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py index d60a862770afb..eae13908a63d9 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py @@ -9,23 +9,28 @@ ExponentialBackoffStrategy, ) +options = {"backoff": 5} +config = {"backoff": 5} + @pytest.mark.parametrize( - "test_name, attempt_count, expected_backoff_time", + "test_name, attempt_count, factor, expected_backoff_time", [ - ("test_exponential_backoff", 1, 10), - ("test_exponential_backoff", 2, 20), + ("test_exponential_backoff_first_attempt", 1, 5, 10), + ("test_exponential_backoff_second_attempt", 2, 5, 20), + ("test_exponential_backoff_from_options", 2, "{{options['backoff']}}", 20), + ("test_exponential_backoff_from_config", 2, "{{config['backoff']}}", 20), ], ) -def test_exponential_backoff(test_name, attempt_count, expected_backoff_time): +def test_exponential_backoff(test_name, attempt_count, factor, expected_backoff_time): response_mock = MagicMock() - backoff_strategy = ExponentialBackoffStrategy(factor=5) + backoff_strategy = ExponentialBackoffStrategy(factor=factor, options=options, config=config) backoff = backoff_strategy.backoff(response_mock, attempt_count) assert backoff == expected_backoff_time def test_exponential_backoff_default(): response_mock = MagicMock() - backoff_strategy = ExponentialBackoffStrategy() + backoff_strategy = ExponentialBackoffStrategy(options=options, config=config) backoff = backoff_strategy.backoff(response_mock, 3) assert backoff == 40 diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py index 915a13762d282..bd1be514d2196 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py @@ -17,6 +17,8 @@ [ ("test_wait_time_from_header", "wait_time", SOME_BACKOFF_TIME, None, SOME_BACKOFF_TIME), ("test_wait_time_from_header_string", "wait_time", "60", None, SOME_BACKOFF_TIME), + ("test_wait_time_from_header_options", "{{ options['wait_time'] }}", "60", None, SOME_BACKOFF_TIME), + ("test_wait_time_from_header_config", "{{ config['wait_time'] }}", "60", None, SOME_BACKOFF_TIME), ("test_wait_time_from_header_not_a_number", "wait_time", "61,60", None, None), ("test_wait_time_from_header_with_regex", "wait_time", "61,60", "([-+]?\d+)", 61), # noqa ("test_wait_time_fœrom_header_with_regex_no_match", "wait_time", "...", "[-+]?\d+", None), # noqa @@ -26,6 +28,8 @@ def test_wait_time_from_header(test_name, header, header_value, regex, expected_backoff_time): response_mock = MagicMock() response_mock.headers = {"wait_time": header_value} - backoff_stratery = WaitTimeFromHeaderBackoffStrategy(header, regex) + backoff_stratery = WaitTimeFromHeaderBackoffStrategy( + header=header, regex=regex, options={"wait_time": "wait_time"}, config={"wait_time": "wait_time"} + ) backoff = backoff_stratery.backoff(response_mock, 1) assert backoff == expected_backoff_time diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_until_time_from_header.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_until_time_from_header.py index 713ee4c4896fe..1ad5afbfa57dd 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_until_time_from_header.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_until_time_from_header.py @@ -11,26 +11,54 @@ ) SOME_BACKOFF_TIME = 60 +REGEX = "[-+]?\\d+" @pytest.mark.parametrize( "test_name, header, wait_until, min_wait, regex, expected_backoff_time", [ ("test_wait_until_time_from_header", "wait_until", 1600000060.0, None, None, 60), + ("test_wait_until_time_from_header_options", "{{options['wait_until']}}", 1600000060.0, None, None, 60), + ("test_wait_until_time_from_header_config", "{{config['wait_until']}}", 1600000060.0, None, None, 60), ("test_wait_until_negative_time", "wait_until", 1500000000.0, None, None, None), ("test_wait_until_time_less_than_min", "wait_until", 1600000060.0, 120, None, 120), ("test_wait_until_no_header", "absent_header", 1600000000.0, None, None, None), ("test_wait_until_time_from_header_not_numeric", "wait_until", "1600000000,1600000000", None, None, None), ("test_wait_until_time_from_header_is_numeric", "wait_until", "1600000060", None, None, 60), ("test_wait_until_time_from_header_with_regex", "wait_until", "1600000060,60", None, "[-+]?\d+", 60), # noqa + ("test_wait_until_time_from_header_with_regex_from_options", "wait_until", "1600000060,60", None, "{{options['regex']}}", 60), + # noqa + ("test_wait_until_time_from_header_with_regex_from_config", "wait_until", "1600000060,60", None, "{{config['regex']}}", 60), # noqa ("test_wait_until_time_from_header_with_regex_no_match", "wait_time", "...", None, "[-+]?\d+", None), # noqa ("test_wait_until_no_header_with_min", "absent_header", "1600000000.0", SOME_BACKOFF_TIME, None, SOME_BACKOFF_TIME), + ( + "test_wait_until_no_header_with_min_from_options", + "absent_header", + "1600000000.0", + "{{options['min_wait']}}", + None, + SOME_BACKOFF_TIME, + ), + ( + "test_wait_until_no_header_with_min_from_config", + "absent_header", + "1600000000.0", + "{{config['min_wait']}}", + None, + SOME_BACKOFF_TIME, + ), ], ) @patch("time.time", return_value=1600000000.0) def test_wait_untiltime_from_header(time_mock, test_name, header, wait_until, min_wait, regex, expected_backoff_time): response_mock = MagicMock() response_mock.headers = {"wait_until": wait_until} - backoff_stratery = WaitUntilTimeFromHeaderBackoffStrategy(header, min_wait, regex) + backoff_stratery = WaitUntilTimeFromHeaderBackoffStrategy( + header=header, + min_wait=min_wait, + regex=regex, + options={"wait_until": "wait_until", "regex": REGEX, "min_wait": SOME_BACKOFF_TIME}, + config={"wait_until": "wait_until", "regex": REGEX, "min_wait": SOME_BACKOFF_TIME}, + ) backoff = backoff_stratery.backoff(response_mock, 1) assert backoff == expected_backoff_time diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py index 6bc94aa6be758..ec8793e9f695e 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py @@ -30,7 +30,7 @@ None, {}, ResponseStatus.retry(SOME_BACKOFF_TIME), - [ConstantBackoffStrategy(SOME_BACKOFF_TIME)], + [ConstantBackoffStrategy(options={}, backoff_time_in_seconds=SOME_BACKOFF_TIME, config={})], ), ("test_exponential_backoff", HTTPStatus.BAD_GATEWAY, None, None, {}, ResponseStatus.retry(10), None), ( @@ -40,7 +40,7 @@ None, {}, ResponseStatus.retry(10), - [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY()], + [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(options={}, config={})], ), ("test_chain_backoff_strategy", HTTPStatus.BAD_GATEWAY, None, None, {}, ResponseStatus.retry(10), None), ( @@ -50,7 +50,10 @@ None, {}, ResponseStatus.retry(10), - [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(), ConstantBackoffStrategy(SOME_BACKOFF_TIME)], + [ + DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(options={}, config={}), + ConstantBackoffStrategy(options={}, backoff_time_in_seconds=SOME_BACKOFF_TIME, config={}), + ], ), ("test_200", HTTPStatus.OK, None, None, {}, response_status.SUCCESS, None), ("test_3XX", HTTPStatus.PERMANENT_REDIRECT, None, None, {}, response_status.SUCCESS, None), diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index fde611268a425..600044fbc1191 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -415,31 +415,31 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel ( "test_option_in_selector", """ - extractor: - type: DpathExtractor - field_pointer: ["{{ options['name'] }}"] - selector: - class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector - $options: - name: "selector" - extractor: "*ref(extractor)" - """, + extractor: + type: DpathExtractor + field_pointer: ["{{ options['name'] }}"] + selector: + class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector + $options: + name: "selector" + extractor: "*ref(extractor)" + """, "selector", ), ( "test_option_in_extractor", """ - extractor: - type: DpathExtractor - $options: - name: "extractor" - field_pointer: ["{{ options['name'] }}"] - selector: - class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector - $options: - name: "selector" - extractor: "*ref(extractor)" - """, + extractor: + type: DpathExtractor + $options: + name: "extractor" + field_pointer: ["{{ options['name'] }}"] + selector: + class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector + $options: + name: "selector" + extractor: "*ref(extractor)" + """, "extractor", ), ], @@ -451,8 +451,53 @@ def test_options_propagation(test_name, content, expected_field_pointer_value): assert selector.extractor.field_pointer[0].eval(input_config) == expected_field_pointer_value -def test_create_requester(): - content = """ +@pytest.mark.parametrize( + "test_name, error_handler", + [ + ( + "test_create_requester_constant_error_handler", + """ + error_handler: + backoff_strategies: + - type: "ConstantBackoffStrategy" + backoff_time_in_seconds: 5 + """, + ), + ( + "test_create_requester_exponential_error_handler", + """ + error_handler: + backoff_strategies: + - type: "ExponentialBackoffStrategy" + factor: 5 + """, + ), + ( + "test_create_requester_wait_time_from_header_error_handler", + """ + error_handler: + backoff_strategies: + - type: "WaitTimeFromHeader" + header: "a_header" + """, + ), + ( + "test_create_requester_wait_time_until_from_header_error_handler", + """ + error_handler: + backoff_strategies: + - type: "WaitUntilTimeFromHeader" + header: "a_header" + """, + ), + ( + "test_create_requester_no_error_handler", + """""", + ), + ], +) +def test_create_requester(test_name, error_handler): + content = f""" requester: type: HttpRequester path: "/v3/marketing/lists" @@ -461,13 +506,14 @@ def test_create_requester(): url_base: "https://api.sendgrid.com" authenticator: type: "BasicHttpAuthenticator" - username: "{{ options.name }}" - password: "{{ config.apikey }}" + username: "{{{{ options.name}}}}" + password: "{{{{ config.apikey }}}}" request_options_provider: request_parameters: a_parameter: "something_here" request_headers: header: header_value + {error_handler} """ config = parser.parse(content) @@ -498,7 +544,6 @@ def test_create_composite_error_handler(): - response_filters: - http_codes: [ 403 ] action: RETRY - error_message: "Retryable error received: {{ response.message }}" """ config = parser.parse(content) @@ -510,7 +555,6 @@ def test_create_composite_error_handler(): assert isinstance(component.error_handlers[0].response_filters[0], HttpResponseFilter) assert component.error_handlers[0].response_filters[0].predicate.condition == "{{ 'code' in response }}" assert component.error_handlers[1].response_filters[0].http_codes == [403] - assert component.error_handlers[1].response_filters[0].error_message.string == "Retryable error received: {{ response.message }}" assert isinstance(component, CompositeErrorHandler) @@ -685,37 +729,6 @@ def test_add_fields(self): ] assert expected == component.transformations - def test_add_fields_path_in_options(self): - content = f""" - the_stream: - class_name: airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream - $options: - {self.base_options} - path: "/wrong_path" - transformations: - - type: AddFields - fields: - - path: ["field1"] - value: "static_value" - """ - config = parser.parse(content) - - factory.create_component(config["the_stream"], input_config, False) - - component = factory.create_component(config["the_stream"], input_config)() - assert isinstance(component, DeclarativeStream) - expected = [ - AddFields( - fields=[ - AddedFieldDefinition( - path=["field1"], value=InterpolatedString(string="static_value", default="static_value", options={}), options={} - ) - ], - options={}, - ) - ] - assert expected == component.transformations - def test_validation_wrong_input_type(): content = """ diff --git a/docs/connector-development/config-based/understanding-the-yaml-file/error-handling.md b/docs/connector-development/config-based/understanding-the-yaml-file/error-handling.md index fdb3a6f07d61a..f229190c74255 100644 --- a/docs/connector-development/config-based/understanding-the-yaml-file/error-handling.md +++ b/docs/connector-development/config-based/understanding-the-yaml-file/error-handling.md @@ -157,8 +157,8 @@ Schema: BackoffStrategy: type: object anyOf: - - "$ref": "#/definitions/ExponentialBackoff" - - "$ref": "#/definitions/ConstantBackoff" + - "$ref": "#/definitions/ExponentialBackoffStrategy" + - "$ref": "#/definitions/ConstantBackoffStrategy" - "$ref": "#/definitions/WaitTimeFromHeader" - "$ref": "#/definitions/WaitUntilTimeFromHeader" ``` @@ -170,7 +170,7 @@ This is the default backoff strategy. The requester will backoff with an exponen Schema: ```yaml - ExponentialBackoff: + ExponentialBackoffStrategy: type: object additionalProperties: true properties: @@ -183,12 +183,12 @@ Schema: ### Constant Backoff -When using the `ConstantBackoff` strategy, the requester will backoff with a constant interval. +When using the `ConstantBackoffStrategy` strategy, the requester will backoff with a constant interval. Schema: ```yaml - ConstantBackoff: + ConstantBackoffStrategy: type: object additionalProperties: true required: @@ -340,13 +340,13 @@ requester: - predicate: "{{ 'code' in response }}" action: RETRY backoff_strategies: - - type: "ConstantBackoff" + - type: "ConstantBackoffStrategy" backoff_time_in_seconds: 5 - response_filters: - http_codes: [ 403 ] action: RETRY backoff_strategies: - - type: "ExponentialBackoff" + - type: "ExponentialBackoffStrategy" ``` ## More readings