Skip to content

[low-code]: Evaluate backoff strategies at runtime #18053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,17 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class, in
options = kwargs.get(OPTIONS_STR, {})
try:
# enums can't accept options
if issubclass(expected_type, enum.Enum):
if issubclass(expected_type, enum.Enum) or self.isBuiltinTypes(definition):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't pass the options parameter if the expected_type is an int, float, or bool

return expected_type(definition)
else:
return expected_type(definition, options=options)
except Exception as e:
raise Exception(f"failed to instantiate type {expected_type}. {e}")
return definition

def isBuiltinTypes(self, obj):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snake case instead?

return isinstance(obj, (int, float, bool))

@staticmethod
def is_object_definition_with_class_name(definition):
return isinstance(definition, dict) and "class_name" in definition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Optional
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin


Expand All @@ -19,7 +21,14 @@ class ConstantBackoffStrategy(BackoffStrategy, JsonSchemaMixin):
backoff_time_in_seconds (float): time to backoff before retrying a retryable request.
"""

backoff_time_in_seconds: float
backoff_time_in_seconds: Union[float, InterpolatedString, str]
options: InitVar[Mapping[str, Any]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add options and config parameters

config: Config

def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.backoff_time_in_seconds, InterpolatedString):
self.backoff_time_in_seconds = str(self.backoff_time_in_seconds)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert to string if needed

self.backoff_time_in_seconds = InterpolatedString.create(self.backoff_time_in_seconds, options=options)

def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
return self.backoff_time_in_seconds
return self.backoff_time_in_seconds.eval(self.config)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eval at runtime

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#

import re
from dataclasses import dataclass
from typing import Optional
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header
Expand All @@ -23,9 +23,10 @@ class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy, JsonSchemaMixin):
"""

header: str
options: InitVar[Mapping[str, Any]]
regex: Optional[str] = None

def __post_init__(self):
def __post_init__(self, options: Mapping[str, Any]):
self.regex = re.compile(self.regex) if self.regex else None

def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.constant_backoff_strategy import ConstantBackoffStrategy

BACKOFF_TIME = 10
OPTIONS_BACKOFF_TIME = 20
CONFIG_BACKOFF_TIME = 30


@pytest.mark.parametrize(
"test_name, attempt_count, expected_backoff_time",
"test_name, attempt_count, backofftime, expected_backoff_time",
[
("test_exponential_backoff", 1, BACKOFF_TIME),
("test_exponential_backoff", 2, BACKOFF_TIME),
("test_constant_backoff_first_attempt", 1, BACKOFF_TIME, BACKOFF_TIME),
("test_constant_backoff_second_attempt", 2, BACKOFF_TIME, BACKOFF_TIME),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test for a float? Right now eval() would end up casting these to ints

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai are you referring to the backoff time or the attempt count? I added tests for both.

  1. backoff time is a float -> backoff time is evaluated as a float
  2. attempt count is a float -> the value is actually never used. Python auto casts it to int. Do you think we should be stricter and fail if we pass a float instead of an int?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just meant for backoff time, we were using the constants for BACKOFF_TIME which were ints, so just adding a true float. I was messing around with things and using the constants eval would come back as int, figured we should just test a float returned too just in case. But I see you added it already so all good!

("test_constant_backoff_from_options", 1, "{{ options['backoff'] }}", OPTIONS_BACKOFF_TIME),
("test_constant_backoff_from_config", 1, "{{ config['backoff'] }}", CONFIG_BACKOFF_TIME),
],
)
def test_exponential_backoff(test_name, attempt_count, expected_backoff_time):
def test_constant_backoff(test_name, attempt_count, backofftime, expected_backoff_time):
response_mock = MagicMock()
backoff_strategy = ConstantBackoffStrategy(backoff_time_in_seconds=BACKOFF_TIME)
backoff_strategy = ConstantBackoffStrategy(
options={"backoff": OPTIONS_BACKOFF_TIME}, backoff_time_in_seconds=backofftime, config={"backoff": CONFIG_BACKOFF_TIME}
)
backoff = backoff_strategy.backoff(response_mock, attempt_count)
assert backoff == expected_backoff_time
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
None,
{},
ResponseStatus.retry(SOME_BACKOFF_TIME),
[ConstantBackoffStrategy(SOME_BACKOFF_TIME)],
[ConstantBackoffStrategy(options={}, backoff_time_in_seconds=SOME_BACKOFF_TIME, config={})],
),
("test_exponential_backoff", HTTPStatus.BAD_GATEWAY, None, None, {}, ResponseStatus.retry(10), None),
(
Expand All @@ -50,7 +50,10 @@
None,
{},
ResponseStatus.retry(10),
[DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(), ConstantBackoffStrategy(SOME_BACKOFF_TIME)],
[
DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(),
ConstantBackoffStrategy(options={}, backoff_time_in_seconds=SOME_BACKOFF_TIME, config={}),
],
),
("test_200", HTTPStatus.OK, None, None, {}, response_status.SUCCESS, None),
("test_3XX", HTTPStatus.PERMANENT_REDIRECT, None, None, {}, response_status.SUCCESS, None),
Expand Down
44 changes: 24 additions & 20 deletions airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,31 +414,31 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel
(
"test_option_in_selector",
"""
extractor:
type: DpathExtractor
field_pointer: ["{{ options['name'] }}"]
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
$options:
name: "selector"
extractor: "*ref(extractor)"
""",
extractor:
type: DpathExtractor
field_pointer: ["{{ options['name'] }}"]
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
$options:
name: "selector"
extractor: "*ref(extractor)"
""",
"selector",
),
(
"test_option_in_extractor",
"""
extractor:
type: DpathExtractor
$options:
name: "extractor"
field_pointer: ["{{ options['name'] }}"]
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
$options:
name: "selector"
extractor: "*ref(extractor)"
""",
extractor:
type: DpathExtractor
$options:
name: "extractor"
field_pointer: ["{{ options['name'] }}"]
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
$options:
name: "selector"
extractor: "*ref(extractor)"
""",
"extractor",
),
],
Expand Down Expand Up @@ -467,6 +467,10 @@ def test_create_requester():
a_parameter: "something_here"
request_headers:
header: header_value
error_handler:
backoff_strategies:
- type: "ConstantBackoffStrategy"
backoff_time_in_seconds: 5
"""
config = parser.parse(content)

Expand Down