Skip to content

Commit 5001474

Browse files
authored
🎉 Source Pinterest: Added backoff strategy for rate-limit errors (#16161)
1 parent ebd0f30 commit 5001474

File tree

6 files changed

+56
-16
lines changed

6 files changed

+56
-16
lines changed

‎airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,7 @@
750750
- name: Pinterest
751751
sourceDefinitionId: 5cb7e5fe-38c2-11ec-8d3d-0242ac130003
752752
dockerRepository: airbyte/source-pinterest
753-
dockerImageTag: 0.1.3
753+
dockerImageTag: 0.1.4
754754
documentationUrl: https://docs.airbyte.io/integrations/sources/pinterest
755755
icon: pinterest.svg
756756
sourceType: api

‎airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7281,7 +7281,7 @@
72817281
supportsNormalization: false
72827282
supportsDBT: false
72837283
supported_destination_sync_modes: []
7284-
- dockerImage: "airbyte/source-pinterest:0.1.3"
7284+
- dockerImage: "airbyte/source-pinterest:0.1.4"
72857285
spec:
72867286
documentationUrl: "https://docs.airbyte.io/integrations/sources/pinterest"
72877287
connectionSpecification:

‎airbyte-integrations/connectors/source-pinterest/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,5 @@ COPY source_pinterest ./source_pinterest
3434
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
3535
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
3636

37-
LABEL io.airbyte.version=0.1.3
37+
LABEL io.airbyte.version=0.1.4
3838
LABEL io.airbyte.name=airbyte/source-pinterest

‎airbyte-integrations/connectors/source-pinterest/source_pinterest/source.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
from .utils import analytics_columns, to_datetime_str
2020

21+
# For Pinterest analytics streams rate limit is 300 calls per day / per user.
22+
# once hit - response would contain `code` property with int.
23+
MAX_RATE_LIMIT_CODE = 8
24+
2125

2226
class PinterestStream(HttpStream, ABC):
2327
url_base = "https://api.pinterest.com/v5/"
@@ -51,20 +55,10 @@ def request_params(
5155

5256
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
5357
"""
54-
For Pinterest analytics streams rate limit is 300 calls per day / per user.
55-
Handling of rate limits for Pinterest analytics streams described in should_retry method of PinterestAnalyticsStream.
56-
Response example:
57-
{
58-
"code": 8,
59-
"message": "You have exceeded your rate limit. Try again later."
60-
}
58+
Parsing response data with respect to Rate Limits.
6159
"""
62-
6360
data = response.json()
6461

65-
if isinstance(data, dict):
66-
self.max_rate_limit_exceeded = data.get("code") == 8
67-
6862
if not self.max_rate_limit_exceeded:
6963
for data_field in self.data_fields:
7064
data = data.get(data_field, [])
@@ -73,12 +67,19 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
7367
yield record
7468

7569
def should_retry(self, response: requests.Response) -> bool:
70+
if isinstance(response.json(), dict):
71+
self.max_rate_limit_exceeded = response.json().get("code", 0) == MAX_RATE_LIMIT_CODE
7672
# when max rate limit exceeded, we should skip the stream.
77-
if response.status_code == 429 and response.json().get("code") == 8:
78-
self.logger.error(f"For stream {self.name} max rate limit exceeded.")
73+
if response.status_code == requests.codes.too_many_requests and self.max_rate_limit_exceeded:
74+
self.logger.error(f"For stream {self.name} Max Rate Limit exceeded.")
7975
setattr(self, "raise_on_http_errors", False)
8076
return 500 <= response.status_code < 600
8177

78+
def backoff_time(self, response: requests.Response) -> Optional[float]:
79+
if response.status_code == requests.codes.too_many_requests:
80+
self.logger.error(f"For stream {self.name} rate limit exceeded.")
81+
return float(response.headers.get("X-RateLimit-Reset", 0))
82+
8283

8384
class PinterestSubStream(HttpSubStream):
8485
def stream_slices(

‎airbyte-integrations/connectors/source-pinterest/unit_tests/test_streams.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from unittest.mock import MagicMock
77

88
import pytest
9+
import requests
910
from source_pinterest.source import (
1011
AdAccountAnalytics,
1112
AdAccounts,
@@ -95,6 +96,43 @@ def test_backoff_time(patch_base_class):
9596
assert stream.backoff_time(response_mock) == expected_backoff_time
9697

9798

99+
@pytest.mark.parametrize(
100+
"test_response, status_code, expected",
101+
[
102+
({"code": 8, "message": "You have exceeded your rate limit. Try again later."}, 429, False),
103+
({"code": 7, "message": "Some other error message"}, 429, False),
104+
],
105+
)
106+
def test_should_retry_on_max_rate_limit_error(requests_mock, test_response, status_code, expected):
107+
stream = Boards(config=MagicMock())
108+
url = "https://api.pinterest.com/v5/boards"
109+
requests_mock.get("https://api.pinterest.com/v5/boards", json=test_response, status_code=status_code)
110+
response = requests.get(url)
111+
result = stream.should_retry(response)
112+
assert result == expected
113+
114+
115+
@pytest.mark.parametrize(
116+
"test_response, test_headers, status_code, expected",
117+
[
118+
({"code": 7, "message": "Some other error message"}, {"X-RateLimit-Reset": "2"}, 429, 2.0),
119+
],
120+
)
121+
def test_backoff_on_rate_limit_error(requests_mock, test_response, status_code, test_headers, expected):
122+
stream = Boards(config=MagicMock())
123+
url = "https://api.pinterest.com/v5/boards"
124+
requests_mock.get(
125+
"https://api.pinterest.com/v5/boards",
126+
json=test_response,
127+
headers=test_headers,
128+
status_code=status_code,
129+
)
130+
131+
response = requests.get(url)
132+
result = stream.backoff_time(response)
133+
assert result == expected
134+
135+
98136
@pytest.mark.parametrize(
99137
("stream_cls, slice, expected"),
100138
[

‎docs/integrations/sources/pinterest.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ Boards streams - 10 calls per sec / per user / per app
7171

7272
| Version | Date | Pull Request | Subject |
7373
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------ |
74+
| 0.1.4 | 2022-09-06 | [16161](https://github.com/airbytehq/airbyte/pull/16161) | Added ability to handle `429 - Too Many Requests` error with respect to `Max Rate Limit Exceeded Error`
7475
| 0.1.3 | 2022-09-02 | [16271](https://github.com/airbytehq/airbyte/pull/16271) | Added support of `OAuth2.0` authentication method
7576
| 0.1.2 | 2021-12-22 | [10223](https://github.com/airbytehq/airbyte/pull/10223) | Fix naming of `AD_ID` and `AD_ACCOUNT_ID` fields |
7677
| 0.1.1 | 2021-12-22 | [9043](https://github.com/airbytehq/airbyte/pull/9043) | Update connector fields title/description |

0 commit comments

Comments
 (0)