Skip to content

Commit 01381ae

Browse files
authored
🐛 Source Shopify: fix one-time retry after Internal Server Error for BULK streams (#37468)
1 parent 5f1e4e6 commit 01381ae

File tree

5 files changed

+63
-24
lines changed

5 files changed

+63
-24
lines changed

airbyte-integrations/connectors/source-shopify/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ data:
1111
connectorSubtype: api
1212
connectorType: source
1313
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
14-
dockerImageTag: 2.0.5
14+
dockerImageTag: 2.0.6
1515
dockerRepository: airbyte/source-shopify
1616
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
1717
githubIssueLabel: source-shopify

airbyte-integrations/connectors/source-shopify/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.0.5"
6+
version = "2.0.6"
77
name = "source-shopify"
88
description = "Source CDK implementation for Shopify."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py

+20-6
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ class ShopifyBulkManager:
8181
job_should_revert_slice: bool = field(init=False, default=False)
8282
# running job log counter
8383
log_job_state_msg_count: int = field(init=False, default=0)
84+
# one time retryable error counter
85+
_one_time_error_retried: bool = field(init=False, default=False)
8486

8587
@property
8688
def tools(self) -> BulkTools:
@@ -185,6 +187,8 @@ def __reset_state(self) -> None:
185187
self.job_self_canceled = False
186188
# set the running job message counter to default
187189
self.log_job_state_msg_count = 0
190+
# set one time retry flag to default
191+
self._one_time_error_retried = False
188192

189193
def job_completed(self) -> bool:
190194
return self.job_state == ShopifyBulkStatus.COMPLETED.value
@@ -306,6 +310,15 @@ def job_check_for_errors(self, response: requests.Response) -> Union[AirbyteTrac
306310
f"Couldn't check the `response` for `errors`, status: {response.status_code}, response: `{response.text}`. Trace: {repr(e)}."
307311
)
308312

313+
def job_one_time_retry_error(self, response: requests.Response, exception: Exception) -> Optional[requests.Response]:
314+
if not self._one_time_error_retried:
315+
request = response.request
316+
self.logger.info(f"Stream: `{self.stream_name}`, retrying `Bad Request`: {request.body}. Error: {repr(exception)}.")
317+
self._one_time_error_retried = True
318+
return self.job_retry_request(request)
319+
else:
320+
self.on_job_with_errors(self.job_check_for_errors(response))
321+
309322
def job_track_running(self) -> Union[AirbyteTracedException, requests.Response]:
310323
# format Job state check args
311324
status_args = self.job_get_request_args(ShopifyBulkTemplates.status)
@@ -322,19 +335,19 @@ def job_track_running(self) -> Union[AirbyteTracedException, requests.Response]:
322335
else:
323336
# execute ERRORS scenario
324337
self.on_job_with_errors(errors)
325-
except ShopifyBulkExceptions.BulkJobBadResponse as e:
326-
request = response.request
327-
self.logger.info(f"Stream: `{self.stream_name}`, retrying Bad Request: {request.body}. Error: {repr(e)}.")
328-
return self.job_retry_request(request)
338+
except (
339+
ShopifyBulkExceptions.BulkJobBadResponse,
340+
ShopifyBulkExceptions.BulkJobUnknownError,
341+
) as error:
342+
return self.job_one_time_retry_error(response, error)
329343

330344
def job_check_state(self) -> Optional[str]:
345+
response: Optional[requests.Response] = None
331346
while not self.job_completed():
332347
if self.job_canceled():
333-
response = None
334348
break
335349
else:
336350
response = self.job_track_running()
337-
338351
# return `job_result_url` when status is `COMPLETED`
339352
return self.job_get_result(response)
340353

@@ -430,6 +443,7 @@ def job_check(self, created_job_response: requests.Response) -> Optional[str]:
430443
ShopifyBulkExceptions.BulkJobFailed,
431444
ShopifyBulkExceptions.BulkJobTimout,
432445
ShopifyBulkExceptions.BulkJobAccessDenied,
446+
# this one is one-time retriable
433447
ShopifyBulkExceptions.BulkJobUnknownError,
434448
) as bulk_job_error:
435449
raise bulk_job_error

airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py

+40-16
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import requests
88
from source_shopify.shopify_graphql.bulk.exceptions import ShopifyBulkExceptions
99
from source_shopify.shopify_graphql.bulk.job import ShopifyBulkStatus
10-
from source_shopify.streams.base_streams import IncrementalShopifyGraphQlBulkStream
1110
from source_shopify.streams.streams import (
1211
Collections,
1312
CustomerAddress,
@@ -119,28 +118,21 @@ def test_job_retry_on_concurrency(request, requests_mock, bulk_job_response, con
119118

120119

121120
@pytest.mark.parametrize(
122-
"job_response, error_type, patch_healthcheck, expected",
121+
"job_response, error_type, expected",
123122
[
124-
(
125-
"bulk_job_completed_response",
126-
None,
127-
False,
128-
"bulk-123456789.jsonl",
129-
),
130-
("bulk_job_failed_response", ShopifyBulkExceptions.BulkJobFailed, False, "exited with FAILED"),
131-
("bulk_job_timeout_response", ShopifyBulkExceptions.BulkJobTimout, False, "exited with TIMEOUT"),
132-
("bulk_job_access_denied_response", ShopifyBulkExceptions.BulkJobAccessDenied, False, "exited with ACCESS_DENIED"),
133-
("bulk_successful_response_with_errors", ShopifyBulkExceptions.BulkJobUnknownError, True, "Could not validate the status of the BULK Job"),
123+
("bulk_job_completed_response", None, "bulk-123456789.jsonl"),
124+
("bulk_job_failed_response", ShopifyBulkExceptions.BulkJobFailed, "exited with FAILED"),
125+
("bulk_job_timeout_response", ShopifyBulkExceptions.BulkJobTimout, "exited with TIMEOUT"),
126+
("bulk_job_access_denied_response", ShopifyBulkExceptions.BulkJobAccessDenied, "exited with ACCESS_DENIED"),
134127
],
135128
ids=[
136129
"completed",
137130
"failed",
138131
"timeout",
139132
"access_denied",
140-
"success with errors (edge)",
141133
],
142134
)
143-
def test_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, patch_healthcheck, expected) -> None:
135+
def test_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
144136
stream = MetafieldOrders(auth_config)
145137
# modify the sleep time for the test
146138
stream.job_manager.concurrent_max_retry = 1
@@ -151,8 +143,6 @@ def test_job_check(mocker, request, requests_mock, job_response, auth_config, er
151143
# patching the method to get the right ID checks
152144
if job_id:
153145
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_get_id", value=job_id)
154-
if patch_healthcheck:
155-
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_healthcheck", value=job_response)
156146
# mocking the response for STATUS CHECKS
157147
requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))
158148
test_job_status_response = requests.post(stream.job_manager.base_url)
@@ -167,6 +157,40 @@ def test_job_check(mocker, request, requests_mock, job_response, auth_config, er
167157
requests_mock.get(job_result_url, json=request.getfixturevalue(job_response))
168158
result = stream.job_manager.job_check(test_job_status_response)
169159
assert expected == result
160+
161+
162+
@pytest.mark.parametrize(
163+
"job_response, error_type, expected",
164+
[
165+
(
166+
"bulk_successful_response_with_errors",
167+
ShopifyBulkExceptions.BulkJobUnknownError,
168+
"Could not validate the status of the BULK Job",
169+
),
170+
],
171+
ids=[
172+
"success with errors (edge)",
173+
],
174+
)
175+
def test_one_time_retry_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
176+
stream = MetafieldOrders(auth_config)
177+
# modify the sleep time for the test
178+
stream.job_manager.concurrent_max_retry = 1
179+
stream.job_manager.concurrent_interval_sec = 1
180+
stream.job_manager.job_check_interval_sec = 1
181+
# get job_id from FIXTURE
182+
job_id = request.getfixturevalue(job_response).get("data", {}).get("node", {}).get("id")
183+
# patching the method to get the right ID checks
184+
if job_id:
185+
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_get_id", value=job_id)
186+
# mocking the response for STATUS CHECKS
187+
requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))
188+
test_job_status_response = requests.post(stream.job_manager.base_url)
189+
with pytest.raises(error_type) as error:
190+
stream.job_manager.job_check(test_job_status_response)
191+
# The retried request should FAIL here, because we stil want to see the Exception raised
192+
# We expect the call count to be 4 due to the status checks, the non-retried request would take 2 calls.
193+
assert expected in repr(error.value) and requests_mock.call_count == 4
170194

171195

172196
@pytest.mark.parametrize(

docs/integrations/sources/shopify.md

+1
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ For all `Shopify GraphQL BULK` api requests these limitations are applied: https
207207

208208
| Version | Date | Pull Request | Subject |
209209
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
210+
| 2.0.6 | 2024-04-22 | [37468](https://github.com/airbytehq/airbyte/pull/37468) | Fixed one time retry for `Internal Server Error` for BULK streams |
210211
| 2.0.5 | 2024-04-03 | [36788](https://github.com/airbytehq/airbyte/pull/36788) | Added ability to dynamically adjust the size of the `slice` |
211212
| 2.0.4 | 2024-03-22 | [36355](https://github.com/airbytehq/airbyte/pull/36355) | Update CDK version to ensure Per-Stream Error Messaging and Record Counts In State (features were already there so just upping the version) |
212213
| 2.0.3 | 2024-03-15 | [36170](https://github.com/airbytehq/airbyte/pull/36170) | Fixed the `STATE` messages emittion frequency for the `nested` sub-streams |

0 commit comments

Comments
 (0)