Skip to content

Commit f02d74a

Browse files
sherifnadagl-pix
authored andcommitted
🎉 Python CDK: Allow setting network adapter args on outgoing HTTP requests (#4493)
1 parent 7ed6cb2 commit f02d74a

File tree

6 files changed

+42
-16
lines changed

6 files changed

+42
-16
lines changed

‎.github/workflows/publish-cdk-command.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
- name: Checkout Airbyte
2828
uses: actions/checkout@v2
2929
- name: Build CDK Package
30-
run: ./gradlew --no-daemon :airbyte-cdk:python:build
30+
run: ./gradlew --no-daemon --no-build-cache :airbyte-cdk:python:build
3131
- name: Add Failure Comment
3232
if: github.event.inputs.comment-id && !success()
3333
uses: peter-evans/create-or-update-comment@v1

‎airbyte-cdk/python/CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.1.5
4+
Allow specifying keyword arguments to be sent on a request made by an HTTP stream: https://github.com/airbytehq/airbyte/pull/4493
5+
36
## 0.1.4
47
Allow to use Python 3.7.0: https://github.com/airbytehq/airbyte/pull/3566
58

‎airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py

+18-6
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,19 @@ def request_body_json(
118118
"""
119119
return None
120120

121+
def request_kwargs(
122+
self,
123+
stream_state: Mapping[str, Any],
124+
stream_slice: Mapping[str, Any] = None,
125+
next_page_token: Mapping[str, Any] = None,
126+
) -> Mapping[str, Any]:
127+
"""
128+
Override to return a mapping of keyword arguments to be used when creating the HTTP request.
129+
Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
130+
this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
131+
"""
132+
return {}
133+
121134
@abstractmethod
122135
def parse_response(
123136
self,
@@ -166,13 +179,13 @@ def _create_prepared_request(
166179
# TODO support non-json bodies
167180
args["json"] = json
168181

169-
return requests.Request(**args).prepare()
182+
return self._session.prepare_request(requests.Request(**args))
170183

171184
# TODO allow configuring these parameters. If we can get this into the requests library, then we can do it without the ugly exception hacks
172185
# see https://github.com/litl/backoff/pull/122
173186
@default_backoff_handler(max_tries=5, factor=5)
174187
@user_defined_backoff_handler(max_tries=5)
175-
def _send_request(self, request: requests.PreparedRequest) -> requests.Response:
188+
def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
176189
"""
177190
Wraps sending the request in rate limit and error handlers.
178191
@@ -190,9 +203,8 @@ def _send_request(self, request: requests.PreparedRequest) -> requests.Response:
190203
Unexpected transient exceptions use the default backoff parameters.
191204
Unexpected persistent exceptions are not handled and will cause the sync to fail.
192205
"""
193-
response: requests.Response = self._session.send(request)
206+
response: requests.Response = self._session.send(request, **request_kwargs)
194207
if self.should_retry(response):
195-
196208
custom_backoff_time = self.backoff_time(response)
197209
if custom_backoff_time:
198210
raise UserDefinedBackoffException(backoff=custom_backoff_time, request=request, response=response)
@@ -224,8 +236,8 @@ def read_records(
224236
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
225237
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
226238
)
227-
228-
response = self._send_request(request)
239+
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
240+
response = self._send_request(request, request_kwargs)
229241
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
230242

231243
next_page_token = self.next_page_token(response)

‎airbyte-cdk/python/docs/concepts/http-streams.md

+5
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,8 @@ errors. It is not currently possible to specify a rate limit Airbyte should adhe
7171
### Stream Slicing
7272

7373
When implementing [stream slicing](incremental-stream.md#streamstream_slices) in an `HTTPStream` each Slice is equivalent to a HTTP request; the stream will make one request per element returned by the `stream_slices` function. The current slice being read is passed into every other method in `HttpStream` e.g: `request_params`, `request_headers`, `path`, etc.. to be injected into a request. This allows you to dynamically determine the output of the `request_params`, `path`, and other functions to read the input slice and return the appropriate value.
74+
75+
### Network Adapter Keyword arguments
76+
If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc..
77+
override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can
78+
be returned as a keyword argument.

‎airbyte-cdk/python/setup.py

+2-9
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
setup(
3737
name="airbyte-cdk",
38-
version="0.1.4",
38+
version="0.1.5",
3939
description="A framework for writing Airbyte Connectors.",
4040
long_description=README,
4141
long_description_content_type="text/markdown",
@@ -73,14 +73,7 @@
7373
"requests",
7474
],
7575
python_requires=">=3.7.0",
76-
extras_require={
77-
"dev": [
78-
"MyPy==0.812",
79-
"pytest",
80-
"pytest-cov",
81-
"pytest-mock",
82-
]
83-
},
76+
extras_require={"dev": ["MyPy==0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock"]},
8477
entry_points={
8578
"console_scripts": ["base-python=base_python.entrypoint:main"],
8679
},

‎airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py

+13
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525

2626
from typing import Any, Iterable, Mapping, Optional
27+
from unittest.mock import ANY
2728

2829
import pytest
2930
import requests
@@ -60,6 +61,18 @@ def parse_response(
6061
yield stubResp
6162

6263

64+
def test_request_kwargs_used(mocker, requests_mock):
65+
stream = StubBasicReadHttpStream()
66+
request_kwargs = {"cert": None, "proxies": "google.com"}
67+
mocker.patch.object(stream, "request_kwargs", return_value=request_kwargs)
68+
mocker.patch.object(stream._session, "send", wraps=stream._session.send)
69+
requests_mock.register_uri("GET", stream.url_base)
70+
71+
list(stream.read_records(sync_mode=SyncMode.full_refresh))
72+
73+
stream._session.send.assert_any_call(ANY, **request_kwargs)
74+
75+
6376
def test_stub_basic_read_http_stream_read_records(mocker):
6477
stream = StubBasicReadHttpStream()
6578
blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway.

0 commit comments

Comments
 (0)