Skip to content

Commit 2293f89

Browse files
πŸŽ‰ Source Amazon Seller Partner: add replication end date to config (#13059)
1 parent fc29ac9 commit 2293f89

File tree

11 files changed

+69
-11
lines changed

11 files changed

+69
-11
lines changed

β€Žairbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
- name: Amazon Seller Partner
2626
sourceDefinitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
2727
dockerRepository: airbyte/source-amazon-seller-partner
28-
dockerImageTag: 0.2.19
28+
dockerImageTag: 0.2.20
2929
sourceType: api
3030
documentationUrl: https://docs.airbyte.io/integrations/sources/amazon-seller-partner
3131
icon: amazonsellerpartner.svg

β€Žairbyte-config/init/src/main/resources/seed/source_specs.yaml

+9-1
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@
213213
type: "string"
214214
path_in_connector_config:
215215
- "client_secret"
216-
- dockerImage: "airbyte/source-amazon-seller-partner:0.2.19"
216+
- dockerImage: "airbyte/source-amazon-seller-partner:0.2.20"
217217
spec:
218218
documentationUrl: "https://docs.airbyte.io/integrations/sources/amazon-seller-partner"
219219
changelogUrl: "https://docs.airbyte.io/integrations/sources/amazon-seller-partner"
@@ -279,6 +279,14 @@
279279
examples:
280280
- "2017-01-25T00:00:00Z"
281281
type: "string"
282+
replication_end_date:
283+
title: "End Date"
284+
description: "UTC date and time in the format 2017-01-25T00:00:00Z. Any\
285+
\ data after this date will not be replicated."
286+
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$|^$"
287+
examples:
288+
- "2017-01-25T00:00:00Z"
289+
type: "string"
282290
period_in_days:
283291
title: "Period In Days"
284292
description: "Will be used for stream slicing for initial full_refresh sync\

β€Žairbyte-integrations/connectors/source-amazon-seller-partner/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ RUN pip install .
1212
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

15-
LABEL io.airbyte.version=0.2.19
15+
LABEL io.airbyte.version=0.2.20
1616
LABEL io.airbyte.name=airbyte/source-amazon-seller-partner

β€Žairbyte-integrations/connectors/source-amazon-seller-partner/integration_tests/spec.json

+7
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@
6666
"examples": ["2017-01-25T00:00:00Z"],
6767
"type": "string"
6868
},
69+
"replication_end_date": {
70+
"title": "End Date",
71+
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data after this date will not be replicated.",
72+
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$|^$",
73+
"examples": ["2017-01-25T00:00:00Z"],
74+
"type": "string"
75+
},
6976
"period_in_days": {
7077
"title": "Period In Days",
7178
"description": "Will be used for stream slicing for initial full_refresh sync when no updated state is present for reports that support sliced incremental sync.",

β€Žairbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/source.py

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def _get_stream_kwargs(self, config: AmazonSellerPartnerConfig) -> Mapping[str,
6565
"period_in_days": config.period_in_days,
6666
"report_options": config.report_options,
6767
"max_wait_seconds": config.max_wait_seconds,
68+
"replication_end_date": config.replication_end_date,
6869
}
6970
return stream_kwargs
7071

β€Žairbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/spec.py

+9
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ class Config:
5353
pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
5454
examples=["2017-01-25T00:00:00Z"],
5555
)
56+
57+
replication_end_date: str = Field(
58+
None,
59+
description="UTC date and time in the format 2017-01-25T00:00:00Z. Any data after this date will not be replicated.",
60+
title="End Date",
61+
pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$|^$",
62+
examples=["2017-01-25T00:00:00Z"],
63+
)
64+
5665
period_in_days: int = Field(
5766
30,
5867
description="Will be used for stream slicing for initial full_refresh sync when no updated state is present for reports that support sliced incremental sync.",

β€Žairbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py

+37-8
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ def __init__(
4646
period_in_days: Optional[int],
4747
report_options: Optional[str],
4848
max_wait_seconds: Optional[int],
49+
replication_end_date: Optional[str],
4950
*args,
5051
**kwargs,
5152
):
5253
super().__init__(*args, **kwargs)
5354

5455
self._url_base = url_base.rstrip("/") + "/"
5556
self._replication_start_date = replication_start_date
57+
self._replication_end_date = replication_end_date
5658
self.marketplace_id = marketplace_id
5759
self._session.auth = aws_signature
5860

@@ -75,6 +77,11 @@ class IncrementalAmazonSPStream(AmazonSPStream, ABC):
7577
def replication_start_date_field(self) -> str:
7678
pass
7779

80+
@property
81+
@abstractmethod
82+
def replication_end_date_field(self) -> str:
83+
pass
84+
7885
@property
7986
@abstractmethod
8087
def next_page_token_field(self) -> str:
@@ -97,9 +104,14 @@ def request_params(
97104
return dict(next_page_token)
98105

99106
params = {self.replication_start_date_field: self._replication_start_date, self.page_size_field: self.page_size}
107+
100108
if self._replication_start_date and self.cursor_field:
101109
start_date = max(stream_state.get(self.cursor_field, self._replication_start_date), self._replication_start_date)
102110
params.update({self.replication_start_date_field: start_date})
111+
112+
if self._replication_end_date:
113+
params[self.replication_end_date_field] = self._replication_end_date
114+
103115
return params
104116

105117
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
@@ -154,13 +166,15 @@ def __init__(
154166
period_in_days: Optional[int],
155167
report_options: Optional[str],
156168
max_wait_seconds: Optional[int],
169+
replication_end_date: Optional[str],
157170
authenticator: HttpAuthenticator = None,
158171
):
159172
self._authenticator = authenticator
160173
self._session = requests.Session()
161174
self._url_base = url_base.rstrip("/") + "/"
162175
self._session.auth = aws_signature
163176
self._replication_start_date = replication_start_date
177+
self._replication_end_date = replication_end_date
164178
self.marketplace_id = marketplace_id
165179
self.period_in_days = period_in_days
166180
self._report_options = report_options
@@ -223,12 +237,21 @@ def _report_data(
223237
) -> Mapping[str, Any]:
224238
replication_start_date = max(pendulum.parse(self._replication_start_date), pendulum.now("utc").subtract(days=90))
225239

226-
return {
240+
params = {
227241
"reportType": self.name,
228242
"marketplaceIds": [self.marketplace_id],
229243
"dataStartTime": replication_start_date.strftime(DATE_TIME_FORMAT),
230244
}
231245

246+
if self._replication_end_date and sync_mode == SyncMode.full_refresh:
247+
params["dataEndTime"] = self._replication_end_date
248+
# if replication_start_date is older than 90 days(from current date), we are overriding the value above.
249+
# when replication_end_date is present, we should use the user provided replication_start_date.
250+
# user may provide a date range which is older than 90 days.
251+
params["dataStartTime"] = self._replication_start_date
252+
253+
return params
254+
232255
def _create_report(
233256
self,
234257
sync_mode: SyncMode,
@@ -539,6 +562,8 @@ def stream_slices(
539562

540563
start_date = pendulum.parse(self._replication_start_date)
541564
end_date = pendulum.now()
565+
if self._replication_end_date and sync_mode == SyncMode.full_refresh:
566+
end_date = pendulum.parse(self._replication_end_date)
542567

543568
if stream_state:
544569
state = stream_state.get(self.cursor_field)
@@ -648,6 +673,7 @@ class Orders(IncrementalAmazonSPStream):
648673
primary_key = "AmazonOrderId"
649674
cursor_field = "LastUpdateDate"
650675
replication_start_date_field = "LastUpdatedAfter"
676+
replication_end_date_field = "LastUpdatedBefore"
651677
next_page_token_field = "NextToken"
652678
page_size_field = "MaxResultsPerPage"
653679
default_backoff_time = 60
@@ -686,16 +712,11 @@ class VendorDirectFulfillmentShipping(AmazonSPStream):
686712
name = "VendorDirectFulfillmentShipping"
687713
primary_key = None
688714
replication_start_date_field = "createdAfter"
715+
replication_end_date_field = "createdBefore"
689716
next_page_token_field = "nextToken"
690717
page_size_field = "limit"
691718
time_format = "%Y-%m-%dT%H:%M:%SZ"
692719

693-
def __init__(self, *args, **kwargs):
694-
super().__init__(*args, **kwargs)
695-
self.replication_start_date_field = max(
696-
pendulum.parse(self._replication_start_date), pendulum.now("utc").subtract(days=7, hours=1)
697-
).strftime(self.time_format)
698-
699720
def path(self, **kwargs) -> str:
700721
return f"vendor/directFulfillment/shipping/{VENDORS_API_VERSION}/shippingLabels"
701722

@@ -704,7 +725,15 @@ def request_params(
704725
) -> MutableMapping[str, Any]:
705726
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
706727
if not next_page_token:
707-
params.update({"createdBefore": pendulum.now("utc").strftime(self.time_format)})
728+
end_date = pendulum.now("utc").strftime(self.time_format)
729+
if self._replication_end_date:
730+
end_date = self._replication_end_date
731+
732+
start_date = max(pendulum.parse(self._replication_start_date), pendulum.parse(end_date).subtract(days=7, hours=1)).strftime(
733+
self.time_format
734+
)
735+
736+
params.update({self.replication_start_date_field: start_date, self.replication_end_date_field: end_date})
708737
return params
709738

710739
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:

β€Žairbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_reports_streams_rate_limits.py

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ def reports_stream():
2222
url_base="https://test.url",
2323
aws_signature=aws_signature,
2424
replication_start_date="2017-01-25T00:00:00Z",
25+
replication_end_date="2017-02-25T00:00:00Z",
2526
marketplace_id="id",
2627
authenticator=None,
2728
period_in_days=0,

β€Žairbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_source.py

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def connector_source():
2121
def connector_config():
2222
return AmazonSellerPartnerConfig(
2323
replication_start_date="2017-01-25T00:00:00Z",
24+
replication_end_date="2017-02-25T00:00:00Z",
2425
refresh_token="Atzr|IwEBIP-abc123",
2526
lwa_app_id="amzn1.application-oa2-client.abc123",
2627
lwa_client_secret="abc123",

β€Žairbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_transform_function.py

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def reports_stream(marketplace_id):
1919
url_base="https://test.url",
2020
aws_signature=aws_signature,
2121
replication_start_date="2010-01-25T00:00:00Z",
22+
replication_end_date="2017-02-25T00:00:00Z",
2223
marketplace_id=marketplace_id,
2324
authenticator=None,
2425
period_in_days=0,

β€Ždocs/integrations/sources/amazon-seller-partner.md

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ This source is capable of syncing the following tables and their data:
9090

9191
| Version | Date | Pull Request | Subject |
9292
|:---------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------|
93+
| `0.2.20` | 2022-05-30 | [\#13059](https://github.com/airbytehq/airbyte/pull/13059) | Add replication end date to config |
9394
| `0.2.19` | 2022-05-24 | [\#13119](https://github.com/airbytehq/airbyte/pull/13119) | Add OAuth2.0 support |
9495
| `0.2.18` | 2022-05-06 | [\#12663](https://github.com/airbytehq/airbyte/pull/12663) | Add GET_XML_BROWSE_TREE_DATA report |
9596
| `0.2.17` | 2022-05-19 | [\#12946](https://github.com/airbytehq/airbyte/pull/12946) | Add throttling exception managing in Orders streams |

0 commit comments

Comments
Β (0)