Skip to content

Commit 6cf3e83

Browse files
feat(source-amazon-seller-partner): rate limits for fatal status (#42052)
1 parent 7ce2ff1 commit 6cf3e83

File tree

11 files changed

+160
-66
lines changed

11 files changed

+160
-66
lines changed

airbyte-integrations/connectors/source-amazon-seller-partner/acceptance-test-config.yml

+4
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ acceptance_tests:
9999
bypass_reason: "Data cannot be seeded in the test account, integration tests added for the stream instead"
100100
- name: GET_VENDOR_FORECASTING_RETAIL_REPORT
101101
bypass_reason: "Data cannot be seeded in the test account, integration tests added for the stream instead"
102+
- name: GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL
103+
bypass_reason: "Data for the stream only available for 2 years. Expired for now. Seeding in progress."
104+
- name: GET_XML_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL
105+
bypass_reason: "Data for the stream only available for 2 years. Expired for now. Seeding in progress."
102106
incremental:
103107
tests:
104108
- config_path: "secrets/config.json"

airbyte-integrations/connectors/source-amazon-seller-partner/integration_tests/expected_records.jsonl

+9-48
Large diffs are not rendered by default.

airbyte-integrations/connectors/source-amazon-seller-partner/integration_tests/spec_oss.json

+7
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,13 @@
205205
}
206206
}
207207
}
208+
},
209+
"wait_to_avoid_fatal_errors": {
210+
"title": "Wait between requests to avoid fatal statuses in reports",
211+
"type": "boolean",
212+
"description": "For report based streams with known amount of requests per time period, this option will use waiting time between requests to avoid fatal statuses in reports. See <a href=\"https://docs.airbyte.com/integrations/sources/amazon-seller-partner#limitations--troubleshooting\" target=\"_blank\">Troubleshooting</a> section for more details",
213+
"default": false,
214+
"order": 11
208215
}
209216
}
210217
},

airbyte-integrations/connectors/source-amazon-seller-partner/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ data:
1515
connectorSubtype: api
1616
connectorType: source
1717
definitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
18-
dockerImageTag: 4.3.11
18+
dockerImageTag: 4.4.0
1919
dockerRepository: airbyte/source-amazon-seller-partner
2020
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-seller-partner
2121
githubIssueLabel: source-amazon-seller-partner

airbyte-integrations/connectors/source-amazon-seller-partner/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "4.3.11"
6+
version = "4.4.0"
77
name = "source-amazon-seller-partner"
88
description = "Source implementation for Amazon Seller Partner."
99
authors = ["Airbyte <[email protected]>"]

airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/source.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
224224
if not report_kwargs:
225225
report_kwargs.append((stream.report_name, {}))
226226
for name, options in report_kwargs:
227-
kwargs = {"stream_name": name, "report_options": options, **stream_kwargs}
227+
kwargs = {
228+
"stream_name": name,
229+
"report_options": options,
230+
"wait_to_avoid_fatal_errors": config.get("wait_to_avoid_fatal_errors", False),
231+
**stream_kwargs,
232+
}
228233
streams.append(stream(**kwargs))
229234
return streams
230235

airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/spec.json

+7
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,13 @@
197197
}
198198
}
199199
}
200+
},
201+
"wait_to_avoid_fatal_errors": {
202+
"title": "Wait between requests to avoid fatal statuses in reports",
203+
"type": "boolean",
204+
"description": "For report based streams with known amount of requests per time period, this option will use waiting time between requests to avoid fatal statuses in reports. See <a href=\"https://docs.airbyte.com/integrations/sources/amazon-seller-partner#limitations--troubleshooting\" target=\"_blank\">Troubleshooting</a> section for more details",
205+
"default": false,
206+
"order": 11
200207
}
201208
}
202209
},

airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py

+21
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
2828
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
2929
from airbyte_protocol.models import FailureType
30+
from source_amazon_seller_partner.utils import STREAM_THRESHOLD_PERIOD, threshold_period_decorator
3031

3132
REPORTS_API_VERSION = "2021-06-30"
3233
ORDERS_API_VERSION = "v0"
@@ -222,6 +223,7 @@ def __init__(
222223
period_in_days: Optional[int],
223224
replication_end_date: Optional[str],
224225
report_options: Optional[List[Mapping[str, Any]]] = None,
226+
wait_to_avoid_fatal_errors: Optional[bool] = False,
225227
*args,
226228
**kwargs,
227229
):
@@ -235,6 +237,8 @@ def __init__(
235237
self._http_method = "GET"
236238
self._stream_name = stream_name
237239

240+
self.wait_to_avoid_fatal_errors = wait_to_avoid_fatal_errors
241+
238242
@property
239243
def name(self):
240244
return self._stream_name
@@ -382,6 +386,7 @@ def stream_slices(
382386
}
383387
start_date = end_date_slice
384388

389+
@threshold_period_decorator
385390
def read_records(
386391
self,
387392
sync_mode: SyncMode,
@@ -480,6 +485,7 @@ def read_records(
480485
f" for period {stream_slice['dataStartTime']}-{stream_slice['dataEndTime']}. "
481486
f"This will be read during the next sync. Report ID: {report_id}."
482487
f" Error: {error_response}"
488+
" Visit https://docs.airbyte.com/integrations/sources/amazon-seller-partner#limitations--troubleshooting for more info."
483489
)
484490
raise AirbyteTracedException(internal_message=exception_message)
485491
elif processing_status == ReportProcessingStatus.CANCELLED:
@@ -582,6 +588,10 @@ class FbaStorageFeesReports(IncrementalReportsAmazonSPStream):
582588
class FulfilledShipmentsReports(IncrementalReportsAmazonSPStream):
583589
"""
584590
Field definitions: https://sellercentral.amazon.com/gp/help/help.html?itemID=200453120
591+
592+
Threshold 12
593+
Period (minutes) 480
594+
585595
"""
586596

587597
report_name = "GET_AMAZON_FULFILLED_SHIPMENTS_DATA_GENERAL"
@@ -681,6 +691,13 @@ def parse_document(self, document):
681691

682692

683693
class FbaEstimatedFbaFeesTxtReport(IncrementalReportsAmazonSPStream):
694+
"""
695+
696+
Threshold 2000
697+
Period (minutes) 60
698+
699+
"""
700+
684701
report_name = "GET_FBA_ESTIMATED_FBA_FEES_TXT_DATA"
685702

686703

@@ -1110,6 +1127,10 @@ class FbaAfnInventoryReports(IncrementalReportsAmazonSPStream):
11101127
Field definitions: https://developer-docs.amazon.com/sp-api/docs/report-type-values#inventory-reports
11111128
Report has a long-running issue (fails when requested frequently):
11121129
https://github.com/amzn/selling-partner-api-docs/issues/2231
1130+
1131+
Threshold 2
1132+
Period (minutes) 25
1133+
11131134
"""
11141135

11151136
report_name = "GET_AFN_INVENTORY_DATA"
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,60 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
4+
import logging
5+
import time
56

67
from airbyte_cdk.utils import AirbyteTracedException
78
from airbyte_protocol.models import FailureType
89

10+
LOG_LEVEL = logging.getLevelName("INFO")
11+
LOGGER = logging.getLogger("airbyte")
12+
913

1014
class AmazonConfigException(AirbyteTracedException):
1115
def __init__(self, **kwargs):
1216
failure_type: FailureType = FailureType.config_error
1317
super(AmazonConfigException, self).__init__(failure_type=failure_type, **kwargs)
18+
19+
20+
class ReportRateLimits:
21+
def __init__(self, threshold: int, period_in_minutes: int):
22+
self.threshold = threshold
23+
self.period_in_minutes = period_in_minutes
24+
25+
@property
26+
def wait_time_in_seconds(self):
27+
return (self.period_in_minutes / self.threshold) * 60
28+
29+
30+
# https://github.com/airbytehq/alpha-beta-issues/issues/3717#issuecomment-2203717834
31+
STREAM_THRESHOLD_PERIOD = {
32+
# Threshold sleep logic for GET_AMAZON_FULFILLED_SHIPMENTS_DATA_GENERAL takes 40m between requests,
33+
# which is too long for certified connectors. Keeping it as documentation for now.
34+
# "GET_AMAZON_FULFILLED_SHIPMENTS_DATA_GENERAL": ReportRateLimits(threshold=12, period_in_minutes=480),
35+
"GET_AFN_INVENTORY_DATA": ReportRateLimits(threshold=2, period_in_minutes=25),
36+
"GET_FBA_ESTIMATED_FBA_FEES_TXT_DATA": ReportRateLimits(threshold=2000, period_in_minutes=60),
37+
}
38+
39+
40+
def threshold_period_decorator(func):
41+
def wrapped(*args, **kwargs):
42+
stream_instance = args[0]
43+
stream_rate_limits = STREAM_THRESHOLD_PERIOD.get(stream_instance.name)
44+
45+
# Enable sleeping if stream has known threshold and period or reading without sleeping
46+
if stream_instance.wait_to_avoid_fatal_errors and stream_rate_limits:
47+
LOGGER.log(
48+
LOG_LEVEL,
49+
f"Stream {stream_instance.name} has a known rate limits values, applying to avoid rate limits.",
50+
)
51+
52+
for record in func(*args, **kwargs):
53+
yield record
54+
55+
LOGGER.log(LOG_LEVEL, f"Sleeping {stream_rate_limits.wait_time_in_seconds} seconds due to rate limits.")
56+
time.sleep(stream_rate_limits.wait_time_in_seconds)
57+
else:
58+
yield from func(*args, **kwargs)
59+
60+
return wrapped

airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_streams.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ def test_read_records_retrieve_fatal(self, report_init_kwargs, mocker, requests_
139139
)
140140
assert e.value.internal_message == (
141141
f"Failed to retrieve the report 'GET_TEST_REPORT' for period {stream_start}-{stream_end}. "
142-
"This will be read during the next sync. Report ID: some_report_id. Error: Failed to retrieve the report result document."
142+
"This will be read during the next sync. Report ID: some_report_id. Error: Failed to retrieve the report result document. "
143+
"Visit https://docs.airbyte.com/integrations/sources/amazon-seller-partner#limitations--troubleshooting for more info."
143144
)
144145

145146
def test_read_records_retrieve_cancelled(self, report_init_kwargs, mocker, requests_mock, caplog):

docs/integrations/sources/amazon-seller-partner.md

+54-13
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ To pass the check for Seller and Vendor accounts, you must have access to the [O
6161
7. For `Start Date`, enter the date in `YYYY-MM-DD` format. The data added on and after this date will be replicated. This field is optional - if not provided or older than 2 years ago from today, the date 2 years ago from today will be used.
6262
8. For `End Date`, enter the date in `YYYY-MM-DD` format. Any data after this date will not be replicated. This field is optional - if not provided, today's date will be used.
6363
9. You can specify report options for each stream using **Report Options** section. Available options can be found in corresponding category [here](https://developer-docs.amazon.com/sp-api/docs/report-type-values).
64-
10. Click `Set up source`.
64+
10. For `Wait between requests to avoid fatal statuses in reports`, enable if you want to use wating time between requests to avoid fatal statuses in report based streams.
65+
11. Click `Set up source`.
6566

6667
<!-- /env:cloud -->
6768

@@ -76,7 +77,8 @@ To pass the check for Seller and Vendor accounts, you must have access to the [O
7677
5. For Start Date, enter the date in YYYY-MM-DD format. The data added on and after this date will be replicated. This field is optional - if not provided, the date 2 years ago from today will be used.
7778
6. For End Date, enter the date in YYYY-MM-DD format. Any data after this date will not be replicated. This field is optional - if not provided, today's date will be used.
7879
7. You can specify report options for each stream using **Report Options** section. Available options can be found in corresponding category [here](https://developer-docs.amazon.com/sp-api/docs/report-type-values).
79-
8. Click `Set up source`.
80+
8. For `Wait between requests to avoid fatal statuses in reports`, enable if you want to use wating time between requests to avoid fatal statuses in report based streams.
81+
9. Click `Set up source`.
8082

8183
<!-- /env:oss -->
8284

@@ -171,24 +173,63 @@ Information about rate limits you may find [here](https://developer-docs.amazon.
171173
| `array` | `array` |
172174
| `object` | `object` |
173175

176+
## Limitations & Troubleshooting
177+
178+
### Failed to retrieve the report
179+
180+
```
181+
Failed to retrieve the report 'YOUR_REPORT_NAME' for period 2024-01-01T12:01:15Z-2024-01-15T12:01:14Z.
182+
This will be read during the next sync. Report ID: YOUR_REPORT_ID. Error: Failed to retrieve the report result document.
183+
```
184+
185+
Requesting reports via Amazon Seller Partner API can lead to failed syncs with error above "Failed to retrieve the report...".
186+
187+
One of the reasons why users face this issue is that report requests were made too often.
188+
189+
**Solution 1:**
190+
191+
To overcome it you can force use sleeping between requests to avoid fatal statuses while requesting reports.
192+
193+
Steps:
194+
1. Go to the Set Up page of the connector.
195+
2. Open optional section.
196+
3. Enable `Wait between requests to avoid fatal statuses in reports` toggle.
197+
198+
Disadvantages of this approach is that syncs with waiting between requests are much slower than without it. So it is better to create a separate connection only for stream that usually fails with "Failed to retrieve the report..." error. This will help you to avoid affecting streams that worked as expected.
199+
200+
:::note
201+
202+
For now the waiting logic only work for the following streams:
203+
- GET_AMAZON_FULFILLED_SHIPMENTS_DATA_GENERAL
204+
- GET_AFN_INVENTORY_DATA
205+
- GET_FBA_ESTIMATED_FBA_FEES_TXT_DATA
206+
207+
:::
208+
209+
**Solution 2:**
210+
211+
Create a separate connection for streams which usually fail with error above "Failed to retrieve the report..." and disable sync of these streams in the first connection with streams which don't fail because of the error. Adjust the sync time of these two connection to do not overlap. It's recommended to have a time break between syncs in the connections.
212+
213+
174214
## Changelog
175215

176216
<details>
177217
<summary>Expand to review</summary>
178218

179219
| Version | Date | Pull Request | Subject |
180220
|:--------|:-----------|:----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
181-
| 4.3.11 | 2024-07-13 | [41873](https://github.com/airbytehq/airbyte/pull/41873) | Update dependencies |
182-
| 4.3.10 | 2024-07-10 | [41345](https://github.com/airbytehq/airbyte/pull/41345) | Update dependencies |
183-
| 4.3.9 | 2024-07-09 | [41158](https://github.com/airbytehq/airbyte/pull/41158) | Update dependencies |
184-
| 4.3.8 | 2024-07-08 | [40751](https://github.com/airbytehq/airbyte/pull/40751) | Improve error messaging and turning on alerting |
185-
| 4.3.7 | 2024-07-06 | [40990](https://github.com/airbytehq/airbyte/pull/40990) | Update dependencies |
186-
| 4.3.6 | 2024-07-01 | [40590](https://github.com/airbytehq/airbyte/pull/40590) | Add log message when data only accessible to seller accounts, add report id in log message for fatal report status, add check for start date. |
187-
| 4.3.5 | 2024-06-27 | [40215](https://github.com/airbytehq/airbyte/pull/40215) | Replaced deprecated AirbyteLogger with logging.Logger |
188-
| 4.3.4 | 2024-06-25 | [40384](https://github.com/airbytehq/airbyte/pull/40384) | Update dependencies |
189-
| 4.3.3 | 2024-06-22 | [40008](https://github.com/airbytehq/airbyte/pull/40008) | Update dependencies |
190-
| 4.3.2 | 2024-06-13 | [39441](https://github.com/airbytehq/airbyte/pull/39441) | Update state handling for incremental streams |
191-
| 4.3.1 | 2024-06-04 | [38969](https://github.com/airbytehq/airbyte/pull/38969) | [autopull] Upgrade base image to v1.2.1 |
221+
| 4.4.0 | 2024-07-17 | [42052](https://github.com/airbytehq/airbyte/pull/42052) | Add waiting between requests logic to avoid failed report requests |
222+
| 4.3.11 | 2024-07-13 | [41873](https://github.com/airbytehq/airbyte/pull/41873) | Update dependencies |
223+
| 4.3.10 | 2024-07-10 | [41345](https://github.com/airbytehq/airbyte/pull/41345) | Update dependencies |
224+
| 4.3.9 | 2024-07-09 | [41158](https://github.com/airbytehq/airbyte/pull/41158) | Update dependencies |
225+
| 4.3.8 | 2024-07-08 | [40751](https://github.com/airbytehq/airbyte/pull/40751) | Improve error messaging and turning on alerting |
226+
| 4.3.7 | 2024-07-06 | [40990](https://github.com/airbytehq/airbyte/pull/40990) | Update dependencies |
227+
| 4.3.6 | 2024-07-01 | [40590](https://github.com/airbytehq/airbyte/pull/40590) | Add log message when data only accessible to seller accounts, add report id in log message for fatal report status, add check for start date. |
228+
| 4.3.5 | 2024-06-27 | [40215](https://github.com/airbytehq/airbyte/pull/40215) | Replaced deprecated AirbyteLogger with logging.Logger |
229+
| 4.3.4 | 2024-06-25 | [40384](https://github.com/airbytehq/airbyte/pull/40384) | Update dependencies |
230+
| 4.3.3 | 2024-06-22 | [40008](https://github.com/airbytehq/airbyte/pull/40008) | Update dependencies |
231+
| 4.3.2 | 2024-06-13 | [39441](https://github.com/airbytehq/airbyte/pull/39441) | Update state handling for incremental streams |
232+
| 4.3.1 | 2024-06-04 | [38969](https://github.com/airbytehq/airbyte/pull/38969) | [autopull] Upgrade base image to v1.2.1 |
192233
| 4.3.0 | 2024-05-24 | [#00000](https://github.com/airbytehq/airbyte/pull/00000) | Extend the report_options spec config with a `stream_name` attribute |
193234
| 4.2.4 | 2024-05-15 | [#38210](https://github.com/airbytehq/airbyte/pull/38210) | Fix `GET_VENDOR_TRAFFIC_REPORT` stream with report option `reportPeriod=DAY` |
194235
| 4.2.3 | 2024-05-09 | [#38078](https://github.com/airbytehq/airbyte/pull/38078) | Hide OSS-only streams in report options config for cloud users |

0 commit comments

Comments
 (0)