Skip to content

🐛 Source Shopify: fix one-time retry after Internal Server Error for BULK streams #37468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.0.5
dockerImageTag: 2.0.6
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
githubIssueLabel: source-shopify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.0.5"
version = "2.0.6"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class ShopifyBulkManager:
job_should_revert_slice: bool = field(init=False, default=False)
# running job log counter
log_job_state_msg_count: int = field(init=False, default=0)
# one time retryable error counter
one_time_error_retried: bool = field(init=False, default=False)

@property
def tools(self) -> BulkTools:
Expand Down Expand Up @@ -185,6 +187,8 @@ def __reset_state(self) -> None:
self.job_self_canceled = False
# set the running job message counter to default
self.log_job_state_msg_count = 0
# set one time retry flag to default
self.one_time_error_retried = False

def job_completed(self) -> bool:
return self.job_state == ShopifyBulkStatus.COMPLETED.value
Expand Down Expand Up @@ -306,6 +310,15 @@ def job_check_for_errors(self, response: requests.Response) -> Union[AirbyteTrac
f"Couldn't check the `response` for `errors`, status: {response.status_code}, response: `{response.text}`. Trace: {repr(e)}."
)

def job_one_time_retry_error(self, response: requests.Response, exception: Exception) -> Optional[requests.Response]:
if not self.one_time_error_retried:
request = response.request
self.logger.info(f"Stream: `{self.stream_name}`, retrying `Bad Request`: {request.body}. Error: {repr(exception)}.")
self.one_time_error_retried = True
return self.job_retry_request(request)
else:
self.on_job_with_errors(self.job_check_for_errors(response))

def job_track_running(self) -> Union[AirbyteTracedException, requests.Response]:
# format Job state check args
status_args = self.job_get_request_args(ShopifyBulkTemplates.status)
Expand All @@ -322,19 +335,19 @@ def job_track_running(self) -> Union[AirbyteTracedException, requests.Response]:
else:
# execute ERRORS scenario
self.on_job_with_errors(errors)
except ShopifyBulkExceptions.BulkJobBadResponse as e:
request = response.request
self.logger.info(f"Stream: `{self.stream_name}`, retrying Bad Request: {request.body}. Error: {repr(e)}.")
return self.job_retry_request(request)
except (
ShopifyBulkExceptions.BulkJobBadResponse,
ShopifyBulkExceptions.BulkJobUnknownError,
) as error:
return self.job_one_time_retry_error(response, error)

def job_check_state(self) -> Optional[str]:
response: Optional[requests.Response] = None
while not self.job_completed():
if self.job_canceled():
response = None
break
else:
response = self.job_track_running()

# return `job_result_url` when status is `COMPLETED`
return self.job_get_result(response)

Expand Down Expand Up @@ -430,6 +443,7 @@ def job_check(self, created_job_response: requests.Response) -> Optional[str]:
ShopifyBulkExceptions.BulkJobFailed,
ShopifyBulkExceptions.BulkJobTimout,
ShopifyBulkExceptions.BulkJobAccessDenied,
# this one is one-time retriable
ShopifyBulkExceptions.BulkJobUnknownError,
) as bulk_job_error:
raise bulk_job_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,28 +119,21 @@ def test_job_retry_on_concurrency(request, requests_mock, bulk_job_response, con


@pytest.mark.parametrize(
"job_response, error_type, patch_healthcheck, expected",
"job_response, error_type, expected",
[
(
"bulk_job_completed_response",
None,
False,
"bulk-123456789.jsonl",
),
("bulk_job_failed_response", ShopifyBulkExceptions.BulkJobFailed, False, "exited with FAILED"),
("bulk_job_timeout_response", ShopifyBulkExceptions.BulkJobTimout, False, "exited with TIMEOUT"),
("bulk_job_access_denied_response", ShopifyBulkExceptions.BulkJobAccessDenied, False, "exited with ACCESS_DENIED"),
("bulk_successful_response_with_errors", ShopifyBulkExceptions.BulkJobUnknownError, True, "Could not validate the status of the BULK Job"),
("bulk_job_completed_response", None, "bulk-123456789.jsonl"),
("bulk_job_failed_response", ShopifyBulkExceptions.BulkJobFailed, "exited with FAILED"),
("bulk_job_timeout_response", ShopifyBulkExceptions.BulkJobTimout, "exited with TIMEOUT"),
("bulk_job_access_denied_response", ShopifyBulkExceptions.BulkJobAccessDenied, "exited with ACCESS_DENIED"),
],
ids=[
"completed",
"failed",
"timeout",
"access_denied",
"success with errors (edge)",
],
)
def test_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, patch_healthcheck, expected) -> None:
def test_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
stream = MetafieldOrders(auth_config)
# modify the sleep time for the test
stream.job_manager.concurrent_max_retry = 1
Expand All @@ -151,8 +144,6 @@ def test_job_check(mocker, request, requests_mock, job_response, auth_config, er
# patching the method to get the right ID checks
if job_id:
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_get_id", value=job_id)
if patch_healthcheck:
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_healthcheck", value=job_response)
# mocking the response for STATUS CHECKS
requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))
test_job_status_response = requests.post(stream.job_manager.base_url)
Expand All @@ -167,6 +158,46 @@ def test_job_check(mocker, request, requests_mock, job_response, auth_config, er
requests_mock.get(job_result_url, json=request.getfixturevalue(job_response))
result = stream.job_manager.job_check(test_job_status_response)
assert expected == result


@pytest.mark.parametrize(
"job_response, error_type, patch_healthcheck, expected",
[
(
"bulk_successful_response_with_errors",
ShopifyBulkExceptions.BulkJobUnknownError,
True,
"Could not validate the status of the BULK Job",
),
],
ids=[
"success with errors (edge)",
],
)
def test_one_time_retry_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, patch_healthcheck, expected) -> None:
stream = MetafieldOrders(auth_config)
# modify the sleep time for the test
stream.job_manager.concurrent_max_retry = 1
stream.job_manager.concurrent_interval_sec = 1
stream.job_manager.job_check_interval_sec = 1
# get job_id from FIXTURE
job_id = request.getfixturevalue(job_response).get("data", {}).get("node", {}).get("id")
# patching the method to get the right ID checks
if job_id:
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_get_id", value=job_id)
if patch_healthcheck:
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_healthcheck", value=job_response)
# mocking the response for STATUS CHECKS
requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))
test_job_status_response = requests.post(stream.job_manager.base_url)

with pytest.raises(error_type) as error:
stream.job_manager.job_check(test_job_status_response)
# check the flag was set to `True`, meaning `Retried`
assert stream.job_manager.one_time_error_retried
# Check for the flag reset after the request was retried.
# The retried request should FAIL here, because we stil want to see the Exception raised
assert not stream.job_manager.one_time_error_retried and expected in repr(error.value)


@pytest.mark.parametrize(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/shopify.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ For all `Shopify GraphQL BULK` api requests these limitations are applied: https

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.6 | 2024-04-22 | [37468](https://github.com/airbytehq/airbyte/pull/37468) | Fixed one time retry for `Internal Server Error` for BULK streams |
| 2.0.5 | 2024-04-03 | [36788](https://github.com/airbytehq/airbyte/pull/36788) | Added ability to dynamically adjust the size of the `slice` |
| 2.0.4 | 2024-03-22 | [36355](https://github.com/airbytehq/airbyte/pull/36355) | Update CDK version to ensure Per-Stream Error Messaging and Record Counts In State (features were already there so just upping the version) |
| 2.0.3 | 2024-03-15 | [36170](https://github.com/airbytehq/airbyte/pull/36170) | Fixed the `STATE` messages emittion frequency for the `nested` sub-streams |
Expand Down
Loading