Skip to content

Add new InterpolatedRequestOptionsProvider that encapsulates all variations of request arguments #13472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
InterpolatedRequestHeaderProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_headers.request_header_provider import RequestHeaderProvider
from airbyte_cdk.sources.declarative.requesters.request_params.interpolated_request_parameter_provider import (
InterpolatedRequestParameterProvider,
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_params.request_parameters_provider import RequestParameterProvider
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier
from airbyte_cdk.sources.declarative.types import Config
Expand All @@ -28,14 +28,16 @@ def __init__(
url_base: [str, InterpolatedString],
path: [str, InterpolatedString],
http_method: Union[str, HttpMethod],
request_parameters_provider: RequestParameterProvider = None,
request_options_provider: RequestOptionsProvider = None,
request_headers_provider: RequestHeaderProvider = None,
authenticator: HttpAuthenticator,
retrier: Retrier,
config: Config,
):
if request_parameters_provider is None:
request_parameters_provider = InterpolatedRequestParameterProvider(config=config, request_headers={})
if request_options_provider is None:
request_options_provider = InterpolatedRequestOptionsProvider(
config=config, request_parameters={}, request_body_data="", request_body_json={}
)
if request_headers_provider is None:
request_headers_provider = InterpolatedRequestHeaderProvider(config=config, request_headers={})
self._name = name
Expand All @@ -49,15 +51,15 @@ def __init__(
if type(http_method) == str:
http_method = HttpMethod[http_method]
self._method = http_method
self._request_parameters_provider = request_parameters_provider
self._request_options_provider = request_options_provider
self._request_headers_provider = request_headers_provider
self._retrier = retrier
self._config = config

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return self._request_parameters_provider.request_params(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token)

def get_authenticator(self):
return self._authenticator
Expand Down Expand Up @@ -100,20 +102,17 @@ def request_headers(
def request_body_data(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Union[Mapping, str]]:
# FIXME: this should be declarative
return dict()
return self._request_options_provider.request_body_data(stream_state, stream_slice, next_page_token)

def request_body_json(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Mapping]:
# FIXME: this should be declarative
return dict()
return self._request_options_provider.request_body_json(stream_state, stream_slice, next_page_token)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai should we also add the POST method to HttpMethod while we're at it so we can knock off #12884 at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already added "POST" to the enum in airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py. I tested it in factory and that did propagate POST to the stream. I wasn't aware of other places that need to be changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai I missed that. looks good then!


def request_kwargs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
# FIXME: this should be declarative
return dict()
return self._request_options_provider.request_kwargs(stream_state, stream_slice, next_page_token)

@property
def cache_filename(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping, MutableMapping
from typing import Any, Mapping, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation


class InterpolatedRequestInputProvider:
"""
Helper class that generically performs string interpolation on the provided dictionary input
Helper class that generically performs string interpolation on the provided dictionary or string input
"""

def __init__(self, *, config, request_inputs=None):
self._config = config

if request_inputs is None:
request_inputs = {}
self._interpolator = InterpolatedMapping(request_inputs, JinjaInterpolation())
self._config = config
if isinstance(request_inputs, str):
self._interpolator = InterpolatedString(request_inputs, "")
else:
self._interpolator = InterpolatedMapping(request_inputs, JinjaInterpolation())

def request_inputs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
) -> Union[Mapping, str]:
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
interpolated_values = self._interpolator.eval(self._config, **kwargs) # dig into this function a little more
non_null_tokens = {k: v for k, v in interpolated_values.items() if v}
return non_null_tokens
interpolated_value = self._interpolator.eval(self._config, **kwargs)

if isinstance(interpolated_value, dict):
non_null_tokens = {k: v for k, v in interpolated_value.items() if v}
return non_null_tokens
return interpolated_value
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.sources.declarative.requesters.interpolated_request_input_provider import InterpolatedRequestInputProvider
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider


class InterpolatedRequestOptionsProvider(RequestOptionsProvider):
def __init__(self, *, config, request_parameters=None, request_body_data=None, request_body_json=None):
if request_parameters is None:
request_parameters = {}
if request_body_data is None:
request_body_data = ""
if request_body_json is None:
request_body_json = {}

if request_body_json and request_body_data:
raise ValueError("RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both")

self._parameter_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_parameters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we generify InterpolatedRequestInputProvider, InterpolatedRequestBodyDataProvider, and InterpolatedRequestInputProvider? At the end of the day they're just InterpolatedRequestInputProvider passing down
{"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token} to the interpolator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see what you mean, it does seem a bit weird to have so many underlying interpolators. The tricky part is that as part of initializing the map interpolator, we pass in the template mapping that we're going to interpolate. So even though the inputs to eval are the same, the underlying interpolator will have to change. A few options we have are:

  1. Dynamically switch the interpolator._mapping field based on an additional parameter for eval() indicating params, data, json, etc.
  2. Create a new interpolator that supports multiple mappings and based on a new map key parameter (string) for request_inputs() which will specify which template to use for the interpolation.

Option 1 feels like an absolute mess to couple object field initialization with eval. But option 2 seems doable. However, I'm still think that having 3 different interpolators despite being pretty verbose is still the clearest way to organize how a template is linked to an interpolator. With option 2 we're just hiding the complexity under another class although with the benefit of only needing one jinja interpolation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per slack discussion:
Still will have 3 different interpolators. But will be getting rid of the InterpolatedRequestBodyDataProvider class entirely in favor of making InterpolatedRequestInputProvider more generic and using it for all three types.

self._body_data_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_data)
self._body_json_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_json)

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
interpolated_value = self._parameter_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
if isinstance(interpolated_value, dict):
return interpolated_value
return {}

def request_body_data(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Union[Mapping, str]]:
return self._body_data_interpolator.request_inputs(stream_state, stream_slice, next_page_token)

def request_body_json(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Mapping]:
return self._body_json_interpolator.request_inputs(stream_state, stream_slice, next_page_token)

def request_kwargs(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a cursory search of which integrations actually use this request_kwargs() method. Many implement this method but just return an empty dictionary with the exception of source-zendesk-chat and source-iterable. I didn't dig super deep why what looks like query parameters had to go into kwargs, but for now I think its best to not include this component just yet.

self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
# todo: there are a few integrations that override the request_kwargs() method, but the use case for why kwargs over existing
# constructs is a little unclear. We may revisit this, but for now lets leave it out of the DSL
return {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any, Mapping, MutableMapping, Optional, Union


class RequestOptionsProvider(ABC):
@abstractmethod
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
pass

@abstractmethod
def request_body_data(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Union[Mapping, str]]:
pass

@abstractmethod
def request_body_json(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Mapping]:
pass

@abstractmethod
def request_kwargs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
pass

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

class HttpMethod(Enum):
GET = "GET"
POST = "POST"


class Requester(ABC):
Expand Down
9 changes: 7 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .rate_limiting import default_backoff_handler, user_defined_backoff_handler

# list of all possible HTTP methods which can be used for sending of request bodies
BODY_REQUEST_METHODS = ("POST", "PUT", "PATCH")
BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH")

logging.getLogger("vcr").setLevel(logging.ERROR)

Expand Down Expand Up @@ -248,7 +248,12 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
return None

def _create_prepared_request(
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None, data: Any = None
self,
path: str,
headers: Mapping = None,
params: Mapping = None,
json: Any = None,
data: Any = None,
) -> requests.PreparedRequest:
args = {"method": self.http_method, "url": urljoin(self.url_base, path), "headers": headers, "params": params}
if self.http_method.upper() in BODY_REQUEST_METHODS:
Expand Down

This file was deleted.

Loading