Skip to content

Commit acd1533

Browse files
authored
🎉 Python CDK: handle requests.exceptions.ChunkedEncodingError for broken connections (#13260)
1 parent 3d9a972 commit acd1533

File tree

8 files changed

+35
-25
lines changed

8 files changed

+35
-25
lines changed

airbyte-cdk/python/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.1.60
4+
- Add `requests.exceptions.ChunkedEncodingError` to transient errors so it could be retried
5+
36
## 0.1.59
47
- Add `Stream.get_error_display_message()` to retrieve user-friendly messages from exceptions encountered while reading streams.
58
- Add default error error message retrieval logic for `HTTPStream`s following common API patterns.

airbyte-cdk/python/airbyte_cdk/sources/streams/http/rate_limiting.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,13 @@
1212

1313
from .exceptions import DefaultBackoffException, UserDefinedBackoffException
1414

15-
TRANSIENT_EXCEPTIONS = (DefaultBackoffException, exceptions.ConnectTimeout, exceptions.ReadTimeout, exceptions.ConnectionError)
15+
TRANSIENT_EXCEPTIONS = (
16+
DefaultBackoffException,
17+
exceptions.ConnectTimeout,
18+
exceptions.ReadTimeout,
19+
exceptions.ConnectionError,
20+
exceptions.ChunkedEncodingError,
21+
)
1622

1723
logger = logging.getLogger("airbyte")
1824

airbyte-cdk/python/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
setup(
1717
name="airbyte-cdk",
18-
version="0.1.59",
18+
version="0.1.60",
1919
description="A framework for writing Airbyte Connectors.",
2020
long_description=README,
2121
long_description_content_type="text/markdown",

airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -248,20 +248,22 @@ def test_raise_on_http_errors_off_non_retryable_4xx(mocker, status_code):
248248
assert response.status_code == status_code
249249

250250

251-
def test_raise_on_http_errors_off_timeout(requests_mock):
252-
stream = AutoFailFalseHttpStream()
253-
requests_mock.register_uri("GET", stream.url_base, exc=requests.exceptions.ConnectTimeout)
254-
255-
with pytest.raises(requests.exceptions.ConnectTimeout):
256-
list(stream.read_records(SyncMode.full_refresh))
257-
258-
259-
def test_raise_on_http_errors_off_connection_error(requests_mock):
251+
@pytest.mark.parametrize(
252+
"error",
253+
(
254+
requests.exceptions.ConnectTimeout,
255+
requests.exceptions.ConnectionError,
256+
requests.exceptions.ChunkedEncodingError,
257+
requests.exceptions.ReadTimeout,
258+
),
259+
)
260+
def test_raise_on_http_errors(mocker, error):
260261
stream = AutoFailFalseHttpStream()
261-
requests_mock.register_uri("GET", stream.url_base, exc=requests.exceptions.ConnectionError)
262+
send_mock = mocker.patch.object(requests.Session, "send", side_effect=error())
262263

263-
with pytest.raises(requests.exceptions.ConnectionError):
264+
with pytest.raises(error):
264265
list(stream.read_records(SyncMode.full_refresh))
266+
assert send_mock.call_count == stream.max_retries + 1
265267

266268

267269
class PostHttpStream(StubBasicReadHttpStream):

airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ protected Optional<NamingConventionTransformer> getNameTransformer() {
128128

129129
@Override
130130
protected void assertNamespaceNormalization(final String testCaseId,
131-
final String expectedNormalizedNamespace,
132-
final String actualNormalizedNamespace) {
131+
final String expectedNormalizedNamespace,
132+
final String actualNormalizedNamespace) {
133133
final String message = String.format("Test case %s failed; if this is expected, please override assertNamespaceNormalization", testCaseId);
134134
if (testCaseId.equals("S3A-1")) {
135135
// bigquery allows namespace starting with a number, and prepending underscore
@@ -155,9 +155,9 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test
155155

156156
@Override
157157
protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
158-
final String streamName,
159-
final String namespace,
160-
final JsonNode streamSchema)
158+
final String streamName,
159+
final String namespace,
160+
final JsonNode streamSchema)
161161
throws Exception {
162162
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace))
163163
.stream()

airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,7 @@ class BigQueryDestinationTest {
111111
private static Stream<Arguments> datasetIdResetterProvider() {
112112
// parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id`
113113
return Stream.of(
114-
Arguments.arguments(new DatasetIdResetter(config -> {
115-
})),
114+
Arguments.arguments(new DatasetIdResetter(config -> {})),
116115
Arguments.arguments(new DatasetIdResetter(
117116
config -> {
118117
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
@@ -153,9 +152,9 @@ void setup(final TestInfo info) throws IOException {
153152

154153
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
155154
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId,
156-
io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING),
157-
io.airbyte.protocol.models.Field
158-
.of("id", JsonSchemaType.STRING))
155+
io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING),
156+
io.airbyte.protocol.models.Field
157+
.of("id", JsonSchemaType.STRING))
159158
.withDestinationSyncMode(DestinationSyncMode.APPEND),
160159
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaType.STRING))));
161160

airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
*/
44

55
package io.airbyte.integrations.destination.bigquery;

airbyte-integrations/connectors/source-scaffold-source-http/source_scaffold_source_http/spec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ connectionSpecification:
77
- TODO
88
additionalProperties: false
99
properties:
10-
# 'TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.':
10+
# 'TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.':
1111
TODO:
1212
type: string
1313
description: describe me

0 commit comments

Comments
 (0)