Skip to content

Commit f2efd27

Browse files
authored
🐛 Source Amazon Seller Partner: Fix check for Vendor accounts (#35331)
1 parent 088b9b7 commit f2efd27

File tree

11 files changed

+171
-174
lines changed

11 files changed

+171
-174
lines changed

airbyte-integrations/connectors/source-amazon-seller-partner/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ data:
1515
connectorSubtype: api
1616
connectorType: source
1717
definitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
18-
dockerImageTag: 3.4.0
18+
dockerImageTag: 3.5.0
1919
dockerRepository: airbyte/source-amazon-seller-partner
2020
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-seller-partner
2121
githubIssueLabel: source-amazon-seller-partner

airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/source.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55

6+
import traceback
67
from os import getenv
78
from typing import Any, List, Mapping, Optional, Tuple
89

@@ -115,10 +116,11 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
115116

116117
if config.get("account_type", "Seller") == "Seller":
117118
stream_to_check = Orders(**stream_kwargs)
119+
next(stream_to_check.read_records(sync_mode=SyncMode.full_refresh))
118120
else:
119-
stream_to_check = VendorSalesReports(**stream_kwargs)
120-
121-
next(stream_to_check.read_records(sync_mode=SyncMode.full_refresh))
121+
stream_to_check = VendorOrders(**stream_kwargs)
122+
stream_slices = list(stream_to_check.stream_slices(sync_mode=SyncMode.full_refresh))
123+
next(stream_to_check.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices[0]))
122124

123125
return True, None
124126
except Exception as e:

airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import csv
77
import gzip
88
import json
9+
import logging
910
import os
1011
import time
1112
from abc import ABC, abstractmethod
@@ -282,6 +283,15 @@ def _retrieve_report(self, report_id: str) -> Mapping[str, Any]:
282283

283284
return report_payload
284285

286+
def _retrieve_report_result(self, report_document_id: str) -> requests.Response:
287+
request_headers = self.request_headers()
288+
request = self._create_prepared_request(
289+
path=self.path(document_id=report_document_id),
290+
headers=dict(request_headers, **self.authenticator.get_auth_header()),
291+
params=self.request_params(),
292+
)
293+
return self._send_request(request, {})
294+
285295
@default_backoff_handler(factor=5, max_tries=5)
286296
def download_and_decompress_report_document(self, payload: dict) -> str:
287297
"""
@@ -381,23 +391,29 @@ def read_records(
381391
if processing_status == ReportProcessingStatus.DONE:
382392
# retrieve and decrypt the report document
383393
document_id = report_payload["reportDocumentId"]
384-
request_headers = self.request_headers()
385-
request = self._create_prepared_request(
386-
path=self.path(document_id=document_id),
387-
headers=dict(request_headers, **self.authenticator.get_auth_header()),
388-
params=self.request_params(),
389-
)
390-
response = self._send_request(request, {})
394+
response = self._retrieve_report_result(document_id)
395+
391396
for record in self.parse_response(response, stream_state, stream_slice):
392397
if report_end_date:
393398
record["dataEndTime"] = report_end_date.strftime(DATE_FORMAT)
394399
yield record
395400
elif processing_status == ReportProcessingStatus.FATAL:
401+
# retrieve and decrypt the report document
402+
try:
403+
document_id = report_payload["reportDocumentId"]
404+
response = self._retrieve_report_result(document_id)
405+
406+
document = self.download_and_decompress_report_document(response.json())
407+
error_response = json.loads(document)
408+
except Exception as e:
409+
logging.error(f"Failed to retrieve the report result document for stream '{self.name}'. Exception: {e}")
410+
error_response = "Failed to retrieve the report result document."
411+
396412
raise AirbyteTracedException(
397413
internal_message=(
398414
f"Failed to retrieve the report '{self.name}' for period "
399-
f"{stream_slice['dataStartTime']}-{stream_slice['dataEndTime']} "
400-
"due to Amazon Seller Partner platform issues. This will be read during the next sync."
415+
f"{stream_slice['dataStartTime']}-{stream_slice['dataEndTime']}. "
416+
f"This will be read during the next sync. Error: {error_response}"
401417
)
402418
)
403419
elif processing_status == ReportProcessingStatus.CANCELLED:

airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/integration/request_builder.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,13 @@
1414

1515

1616
class RequestBuilder:
17-
1817
@classmethod
1918
def auth_endpoint(cls) -> RequestBuilder:
2019
request_headers = {"Content-Type": "application/x-www-form-urlencoded"}
2120
request_body = (
22-
f"grant_type=refresh_token&client_id={LWA_APP_ID}&"
23-
f"client_secret={LWA_CLIENT_SECRET}&refresh_token={REFRESH_TOKEN}"
24-
)
25-
return cls("auth/o2/token").with_base_url("https://api.amazon.com").with_headers(request_headers).with_body(
26-
request_body
21+
f"grant_type=refresh_token&client_id={LWA_APP_ID}&" f"client_secret={LWA_CLIENT_SECRET}&refresh_token={REFRESH_TOKEN}"
2722
)
23+
return cls("auth/o2/token").with_base_url("https://api.amazon.com").with_headers(request_headers).with_body(request_body)
2824

2925
@classmethod
3026
def create_report_endpoint(cls, report_name: str) -> RequestBuilder:

airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/integration/test_report_based_streams.py

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,11 @@ def _check_report_status_response(
118118
"dataEndTime": CONFIG_END_DATE,
119119
"createdTime": CONFIG_START_DATE,
120120
"dataStartTime": CONFIG_START_DATE,
121+
"reportDocumentId": report_document_id,
121122
}
122123
if processing_status == ReportProcessingStatus.DONE:
123124
response_body.update(
124125
{
125-
"reportDocumentId": report_document_id,
126126
"processingEndTime": CONFIG_START_DATE,
127127
"processingStartTime": CONFIG_START_DATE,
128128
}
@@ -141,18 +141,22 @@ def _get_document_download_url_response(
141141
return build_response(response_body, status_code=HTTPStatus.OK)
142142

143143

144-
def _download_document_response(
145-
stream_name: str, data_format: Optional[str] = "csv", compressed: Optional[bool] = False
146-
) -> HttpResponse:
144+
def _download_document_response(stream_name: str, data_format: Optional[str] = "csv", compressed: Optional[bool] = False) -> HttpResponse:
147145
response_body = find_template(stream_name, __file__, data_format)
148146
if compressed:
149147
response_body = gzip.compress(response_body.encode("iso-8859-1"))
150148
return HttpResponse(body=response_body, status_code=HTTPStatus.OK)
151149

152150

151+
def _download_document_error_response(compressed: Optional[bool] = False) -> HttpResponse:
152+
response_body = '{"errorDetails":"Error in report request: This report type requires the reportPeriod, distributorView, sellingProgram reportOption to be specified. Please review the document for this report type on GitHub, provide a value for this reportOption in your request, and try again."}'
153+
if compressed:
154+
response_body = gzip.compress(response_body.encode("iso-8859-1"))
155+
return HttpResponse(body=response_body, status_code=HTTPStatus.OK)
156+
157+
153158
@freezegun.freeze_time(NOW.isoformat())
154159
class TestFullRefresh:
155-
156160
@staticmethod
157161
def _read(stream_name: str, config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
158162
return read_output(
@@ -164,9 +168,7 @@ def _read(stream_name: str, config_: ConfigBuilder, expecting_exception: bool =
164168

165169
@pytest.mark.parametrize(("stream_name", "data_format"), STREAMS)
166170
@HttpMocker()
167-
def test_given_report_when_read_then_return_records(
168-
self, stream_name: str, data_format: str, http_mocker: HttpMocker
169-
) -> None:
171+
def test_given_report_when_read_then_return_records(self, stream_name: str, data_format: str, http_mocker: HttpMocker) -> None:
170172
mock_auth(http_mocker)
171173

172174
http_mocker.post(_create_report_request(stream_name).build(), _create_report_response(_REPORT_ID))
@@ -329,9 +331,7 @@ def test_given_report_access_forbidden_when_read_then_no_records_and_error_logge
329331
) -> None:
330332
mock_auth(http_mocker)
331333

332-
http_mocker.post(
333-
_create_report_request(stream_name).build(), response_with_status(status_code=HTTPStatus.FORBIDDEN)
334-
)
334+
http_mocker.post(_create_report_request(stream_name).build(), response_with_status(status_code=HTTPStatus.FORBIDDEN))
335335

336336
output = self._read(stream_name, config())
337337
message_on_access_forbidden = (
@@ -354,9 +354,7 @@ def test_given_report_status_cancelled_when_read_then_stream_completed_successfu
354354
_check_report_status_response(stream_name, processing_status=ReportProcessingStatus.CANCELLED),
355355
)
356356

357-
message_on_report_cancelled = (
358-
f"The report for stream '{stream_name}' was cancelled or there is no data to return."
359-
)
357+
message_on_report_cancelled = f"The report for stream '{stream_name}' was cancelled or there is no data to return."
360358

361359
output = self._read(stream_name, config())
362360
assert_message_in_log_output(message_on_report_cancelled, output)
@@ -372,14 +370,27 @@ def test_given_report_status_fatal_when_read_then_exception_raised(
372370
http_mocker.post(_create_report_request(stream_name).build(), _create_report_response(_REPORT_ID))
373371
http_mocker.get(
374372
_check_report_status_request(_REPORT_ID).build(),
375-
_check_report_status_response(stream_name, processing_status=ReportProcessingStatus.FATAL),
373+
_check_report_status_response(
374+
stream_name, processing_status=ReportProcessingStatus.FATAL, report_document_id=_REPORT_DOCUMENT_ID
375+
),
376+
)
377+
378+
http_mocker.get(
379+
_get_document_download_url_request(_REPORT_DOCUMENT_ID).build(),
380+
_get_document_download_url_response(_DOCUMENT_DOWNLOAD_URL, _REPORT_DOCUMENT_ID),
381+
)
382+
http_mocker.get(
383+
_download_document_request(_DOCUMENT_DOWNLOAD_URL).build(),
384+
[
385+
response_with_status(status_code=HTTPStatus.INTERNAL_SERVER_ERROR),
386+
_download_document_error_response(),
387+
],
376388
)
377389

378390
output = self._read(stream_name, config(), expecting_exception=True)
379391
assert output.errors[-1].trace.error.failure_type == FailureType.config_error
380392
assert (
381-
f"Failed to retrieve the report '{stream_name}' for period {CONFIG_START_DATE}-{CONFIG_END_DATE} "
382-
"due to Amazon Seller Partner platform issues. This will be read during the next sync."
393+
f"Failed to retrieve the report '{stream_name}' for period {CONFIG_START_DATE}-{CONFIG_END_DATE}. This will be read during the next sync. Error: {{'errorDetails': 'Error in report request: This report type requires the reportPeriod, distributorView, sellingProgram reportOption to be specified. Please review the document for this report type on GitHub, provide a value for this reportOption in your request, and try again.'}}"
383394
) in output.errors[-1].trace.error.message
384395

385396
@pytest.mark.parametrize(
@@ -405,9 +416,7 @@ def test_given_report_with_incorrect_date_format_when_read_then_formatted(
405416
_get_document_download_url_request(_REPORT_DOCUMENT_ID).build(),
406417
_get_document_download_url_response(_DOCUMENT_DOWNLOAD_URL, _REPORT_DOCUMENT_ID),
407418
)
408-
http_mocker.get(
409-
_download_document_request(_DOCUMENT_DOWNLOAD_URL).build(), _download_document_response(stream_name)
410-
)
419+
http_mocker.get(_download_document_request(_DOCUMENT_DOWNLOAD_URL).build(), _download_document_response(stream_name))
411420

412421
output = self._read(stream_name, config())
413422
assert len(output.records) == DEFAULT_EXPECTED_NUMBER_OF_RECORDS
@@ -425,9 +434,7 @@ def test_given_http_error_500_on_create_report_when_read_then_no_records_and_err
425434
response_with_status(status_code=HTTPStatus.INTERNAL_SERVER_ERROR),
426435
)
427436

428-
message_on_backoff_exception = (
429-
f"The report for stream '{stream_name}' was cancelled due to several failed retry attempts."
430-
)
437+
message_on_backoff_exception = f"The report for stream '{stream_name}' was cancelled due to several failed retry attempts."
431438

432439
output = self._read(stream_name, config())
433440
assert_message_in_log_output(message_on_backoff_exception, output)

airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/integration/test_vendor_direct_fulfillment_shipping.py

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ def _shipping_label_record() -> RecordBuilder:
6363

6464
@freezegun.freeze_time(NOW.isoformat())
6565
class TestFullRefresh:
66-
6766
@staticmethod
6867
def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
6968
return read_output(
@@ -89,9 +88,7 @@ def test_given_two_pages_when_read_then_return_records(self, http_mocker: HttpMo
8988
mock_auth(http_mocker)
9089
http_mocker.get(
9190
_vendor_direct_fulfillment_shipping_request().build(),
92-
_vendor_direct_fulfillment_shipping_response().with_pagination().with_record(
93-
_shipping_label_record()
94-
).build(),
91+
_vendor_direct_fulfillment_shipping_response().with_pagination().with_record(_shipping_label_record()).build(),
9592
)
9693
query_params_with_next_page_token = {
9794
_REPLICATION_START_FIELD: _START_DATE.strftime(TIME_FORMAT),
@@ -100,9 +97,10 @@ def test_given_two_pages_when_read_then_return_records(self, http_mocker: HttpMo
10097
}
10198
http_mocker.get(
10299
_vendor_direct_fulfillment_shipping_request().with_query_params(query_params_with_next_page_token).build(),
103-
_vendor_direct_fulfillment_shipping_response().with_record(_shipping_label_record()).with_record(
104-
_shipping_label_record()
105-
).build(),
100+
_vendor_direct_fulfillment_shipping_response()
101+
.with_record(_shipping_label_record())
102+
.with_record(_shipping_label_record())
103+
.build(),
106104
)
107105

108106
output = self._read(config().with_start_date(_START_DATE).with_end_date(_END_DATE))
@@ -135,9 +133,7 @@ def test_given_two_slices_when_read_then_return_records(self, http_mocker: HttpM
135133
assert len(output.records) == 2
136134

137135
@HttpMocker()
138-
def test_given_http_status_500_then_200_when_read_then_retry_and_return_records(
139-
self, http_mocker: HttpMocker
140-
) -> None:
136+
def test_given_http_status_500_then_200_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None:
141137
mock_auth(http_mocker)
142138
http_mocker.get(
143139
_vendor_direct_fulfillment_shipping_request().build(),
@@ -151,9 +147,7 @@ def test_given_http_status_500_then_200_when_read_then_retry_and_return_records(
151147
assert len(output.records) == 1
152148

153149
@HttpMocker()
154-
def test_given_http_status_500_on_availability_when_read_then_raise_system_error(
155-
self, http_mocker: HttpMocker
156-
) -> None:
150+
def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None:
157151
mock_auth(http_mocker)
158152
http_mocker.get(
159153
_vendor_direct_fulfillment_shipping_request().build(),
@@ -166,7 +160,6 @@ def test_given_http_status_500_on_availability_when_read_then_raise_system_error
166160

167161
@freezegun.freeze_time(NOW.isoformat())
168162
class TestIncremental:
169-
170163
@staticmethod
171164
def _read(
172165
config_: ConfigBuilder, state: Optional[List[AirbyteStateMessage]] = None, expecting_exception: bool = False
@@ -196,9 +189,10 @@ def test_when_read_then_state_message_produced_and_state_match_latest_record(sel
196189
mock_auth(http_mocker)
197190
http_mocker.get(
198191
_vendor_direct_fulfillment_shipping_request().build(),
199-
_vendor_direct_fulfillment_shipping_response().with_record(_shipping_label_record()).with_record(
200-
_shipping_label_record()
201-
).build(),
192+
_vendor_direct_fulfillment_shipping_response()
193+
.with_record(_shipping_label_record())
194+
.with_record(_shipping_label_record())
195+
.build(),
202196
)
203197

204198
output = self._read(config().with_start_date(_START_DATE).with_end_date(_END_DATE))
@@ -217,21 +211,21 @@ def test_given_state_when_read_then_state_value_is_created_after_query_param(sel
217211
_REPLICATION_START_FIELD: _START_DATE.strftime(TIME_FORMAT),
218212
_REPLICATION_END_FIELD: _END_DATE.strftime(TIME_FORMAT),
219213
}
220-
query_params_incremental_read = {
221-
_REPLICATION_START_FIELD: state_value, _REPLICATION_END_FIELD: _END_DATE.strftime(TIME_FORMAT)
222-
}
214+
query_params_incremental_read = {_REPLICATION_START_FIELD: state_value, _REPLICATION_END_FIELD: _END_DATE.strftime(TIME_FORMAT)}
223215

224216
http_mocker.get(
225217
_vendor_direct_fulfillment_shipping_request().with_query_params(query_params_first_read).build(),
226-
_vendor_direct_fulfillment_shipping_response().with_record(_shipping_label_record()).with_record(
227-
_shipping_label_record()
228-
).build(),
218+
_vendor_direct_fulfillment_shipping_response()
219+
.with_record(_shipping_label_record())
220+
.with_record(_shipping_label_record())
221+
.build(),
229222
)
230223
http_mocker.get(
231224
_vendor_direct_fulfillment_shipping_request().with_query_params(query_params_incremental_read).build(),
232-
_vendor_direct_fulfillment_shipping_response().with_record(_shipping_label_record()).with_record(
233-
_shipping_label_record()
234-
).build(),
225+
_vendor_direct_fulfillment_shipping_response()
226+
.with_record(_shipping_label_record())
227+
.with_record(_shipping_label_record())
228+
.build(),
235229
)
236230

237231
output = self._read(

0 commit comments

Comments
 (0)