Skip to content

Commit d34b521

Browse files
authored
🐛 Source Shopify: Allow the known HTTP errors to be retried more than once for the BULK streams (#37589)
1 parent 8665eaf commit d34b521

File tree

10 files changed

+403
-325
lines changed

10 files changed

+403
-325
lines changed

airbyte-integrations/connectors/source-shopify/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ data:
1111
connectorSubtype: api
1212
connectorType: source
1313
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
14-
dockerImageTag: 2.0.7
14+
dockerImageTag: 2.0.8
1515
dockerRepository: airbyte/source-shopify
1616
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
1717
githubIssueLabel: source-shopify

airbyte-integrations/connectors/source-shopify/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.0.7"
6+
version = "2.0.8"
77
name = "source-shopify"
88
description = "Source CDK implementation for Shopify."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py

+223-240
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from functools import wraps
4+
from time import sleep
5+
from typing import Any, Callable, Final, Optional, Tuple, Type
6+
7+
from airbyte_cdk import AirbyteLogger
8+
9+
from .exceptions import ShopifyBulkExceptions
10+
11+
BULK_RETRY_ERRORS: Final[Tuple] = (
12+
ShopifyBulkExceptions.BulkJobBadResponse,
13+
ShopifyBulkExceptions.BulkJobUnknownError,
14+
)
15+
16+
17+
def bulk_retry_on_exception(logger: AirbyteLogger, more_exceptions: Optional[Tuple[Type[Exception], ...]] = None) -> Callable:
18+
"""
19+
A decorator to retry a function when specified exceptions are raised.
20+
21+
:param logger: Number of times to retry.
22+
:param more_exceptions: A tuple of exception types to catch.
23+
"""
24+
25+
def decorator(func: Callable) -> Callable:
26+
@wraps(func)
27+
def wrapper(self, *args, **kwargs) -> Any:
28+
# mandatory class attributes
29+
max_retries = self._job_max_retries
30+
stream_name = self.stream_name
31+
backoff_time = self._job_backoff_time
32+
33+
current_retries = 0
34+
while True:
35+
try:
36+
return func(self, *args, **kwargs)
37+
except BULK_RETRY_ERRORS or more_exceptions as ex:
38+
current_retries += 1
39+
if current_retries > max_retries:
40+
logger.error("Exceeded retry limit. Giving up.")
41+
raise
42+
else:
43+
logger.warning(
44+
f"Stream `{stream_name}`: {ex}. Retrying {current_retries}/{max_retries} after {backoff_time} seconds."
45+
)
46+
sleep(backoff_time)
47+
48+
return wrapper
49+
50+
return decorator
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from enum import Enum
4+
5+
6+
class ShopifyBulkJobStatus(Enum):
7+
CREATED = "CREATED"
8+
CANCELED = "CANCELED"
9+
CANCELING = "CANCELING"
10+
COMPLETED = "COMPLETED"
11+
RUNNING = "RUNNING"
12+
FAILED = "FAILED"
13+
TIMEOUT = "TIMEOUT"
14+
ACCESS_DENIED = "ACCESS_DENIED"

airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ def __init__(self, config: Dict) -> None:
642642
# define BULK Manager instance
643643
self.job_manager: ShopifyBulkManager = ShopifyBulkManager(
644644
session=self._session,
645-
base_url=f"{self.url_base}/{self.path()}",
645+
base_url=f"{self.url_base}{self.path()}",
646646
stream_name=self.name,
647647
)
648648
# overide the default job slice size, if provided (it's auto-adjusted, later on)
@@ -748,7 +748,7 @@ def get_state_value(self, stream_state: Mapping[str, Any] = None) -> Optional[Un
748748
return self.config.get("start_date")
749749

750750
def emit_slice_message(self, slice_start: datetime, slice_end: datetime) -> None:
751-
slice_size_message = f"Slice size: `P{round(self.job_manager.job_size, 1)}D`"
751+
slice_size_message = f"Slice size: `P{round(self.job_manager._job_size, 1)}D`"
752752
self.logger.info(f"Stream: `{self.name}` requesting BULK Job for period: {slice_start} -- {slice_end}. {slice_size_message}")
753753

754754
@stream_state_cache.cache_stream_state
@@ -773,8 +773,10 @@ def process_bulk_results(
773773
response: requests.Response,
774774
stream_state: Optional[Mapping[str, Any]] = None,
775775
) -> Iterable[Mapping[str, Any]]:
776-
# get results fetched from COMPLETED BULK Job or `None`
777-
filename = self.job_manager.job_check(response)
776+
# process the CREATED Job prior to other actions
777+
self.job_manager.job_process_created(response)
778+
# get results fetched from COMPLETED BULK Job
779+
filename = self.job_manager.job_check_for_completion()
778780
# the `filename` could be `None`, meaning there are no data available for the slice period.
779781
if filename:
780782
# add `shop_url` field to each record produced

airbyte-integrations/connectors/source-shopify/source_shopify/utils.py

+11-20
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class ShopifyRateLimiter:
8686
"""
8787

8888
on_unknown_load: float = 1.0
89+
on_very_low_load: float = 0.0
8990
on_low_load: float = 0.2
9091
on_mid_load: float = 1.5
9192
on_high_load: float = 5.0
@@ -122,20 +123,26 @@ def _convert_load_to_time(load: Optional[float], threshold: float) -> float:
122123
:: wait_time - time to wait between each request in seconds
123124
124125
"""
125-
mid_load = threshold / 2 # average load based on threshold
126+
127+
half_of_threshold = threshold / 2 # average load based on threshold
128+
quarter_of_threshold = threshold / 4 # low load based on threshold
129+
126130
if not load:
127131
# when there is no rate_limits from header, use the `sleep_on_unknown_load`
128132
wait_time = ShopifyRateLimiter.on_unknown_load
129133
ShopifyRateLimiter.log_message_counter("API Load: `REGULAR`")
130-
elif load >= threshold:
134+
elif threshold <= load:
131135
wait_time = ShopifyRateLimiter.on_high_load
132136
ShopifyRateLimiter.log_message_counter("API Load: `HIGH`")
133-
elif load >= mid_load:
137+
elif half_of_threshold <= load < threshold:
134138
wait_time = ShopifyRateLimiter.on_mid_load
135139
ShopifyRateLimiter.log_message_counter("API Load: `MID`")
136-
elif load < mid_load:
140+
elif quarter_of_threshold <= load < half_of_threshold:
137141
wait_time = ShopifyRateLimiter.on_low_load
138142
ShopifyRateLimiter.log_message_counter("API Load: `LOW`")
143+
elif load < quarter_of_threshold:
144+
wait_time = ShopifyRateLimiter.on_very_low_load
145+
139146
return wait_time
140147

141148
@staticmethod
@@ -219,22 +226,6 @@ def get_graphql_api_wait_time(*args, threshold: float = 0.9) -> float:
219226
wait_time = ShopifyRateLimiter._convert_load_to_time(load, threshold)
220227
return wait_time
221228

222-
def _debug_info(*args) -> Any:
223-
# find the requests.Response inside args list
224-
response = ShopifyRateLimiter.get_response_from_args(*args)
225-
226-
if response:
227-
try:
228-
content = response.json()
229-
content_keys = list(content.keys())
230-
stream_name = content_keys[0] if len(content_keys) > 0 else None
231-
content_lengh = len(content.get(stream_name, [])) if stream_name else None
232-
debug_info = {"stream": stream_name, "url": response.request.url, "n_records": content_lengh}
233-
return debug_info
234-
except (requests.JSONDecodeError, Exception):
235-
# bypassing the errors, we don't care about it here
236-
pass
237-
238229
@staticmethod
239230
def wait_time(wait_time: float) -> None:
240231
return sleep(wait_time)

0 commit comments

Comments
 (0)