Skip to content

Commit f281f8f

Browse files
authored
🐛 Source Amazon Ads - Generate slices by lazy evaluation (#15637)
Signed-off-by: Sergey Chvalyuk <[email protected]>
1 parent 4e6cb05 commit f281f8f

File tree

8 files changed

+103
-24
lines changed

8 files changed

+103
-24
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
- name: Amazon Ads
1818
sourceDefinitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246
1919
dockerRepository: airbyte/source-amazon-ads
20-
dockerImageTag: 0.1.12
20+
dockerImageTag: 0.1.14
2121
documentationUrl: https://docs.airbyte.io/integrations/sources/amazon-ads
2222
icon: amazonads.svg
2323
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@
8787
supportsNormalization: false
8888
supportsDBT: false
8989
supported_destination_sync_modes: []
90-
- dockerImage: "airbyte/source-amazon-ads:0.1.12"
90+
- dockerImage: "airbyte/source-amazon-ads:0.1.14"
9191
spec:
9292
documentationUrl: "https://docs.airbyte.com/integrations/sources/amazon-ads"
9393
connectionSpecification:

airbyte-integrations/connectors/source-amazon-ads/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ RUN pip install .
1212
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

15-
LABEL io.airbyte.version=0.1.12
15+
LABEL io.airbyte.version=0.1.14
1616
LABEL io.airbyte.name=airbyte/source-amazon-ads

airbyte-integrations/connectors/source-amazon-ads/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"pytest-mock~=3.7.0",
1313
"jsonschema~=3.2.0",
1414
"responses~=0.13.3",
15-
"freezegun~=1.1.0",
15+
"freezegun~=1.2.0",
1616
]
1717

1818
setup(

airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/report_streams.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from pydantic import BaseModel
2222
from source_amazon_ads.schemas import CatalogModel, MetricsReport, Profile
2323
from source_amazon_ads.streams.common import BasicAmazonAdsStream
24+
from source_amazon_ads.utils import iterate_one_by_one
2425

2526
logger = AirbyteLogger()
2627

@@ -92,6 +93,8 @@ class ReportStream(BasicAmazonAdsStream, ABC):
9293
primary_key = ["profileId", "recordType", "reportDate", "updatedAt"]
9394
# Amazon ads updates the data for the next 3 days
9495
LOOK_BACK_WINDOW = 3
96+
# https://advertising.amazon.com/API/docs/en-us/reporting/v2/faq#what-is-the-available-report-history-for-the-version-2-reporting-api
97+
REPORTING_PERIOD = 60
9598
# (Service limits section)
9699
# Format used to specify metric generation date over Amazon Ads API.
97100
REPORT_DATE_FORMAT = "YYYYMMDD"
@@ -265,35 +268,42 @@ def _send_http_request(self, url: str, profile_id: int, json: dict = None):
265268
raise TooManyRequests()
266269
return response
267270

268-
def get_date_range(self, start_date: Date, end_date: Date) -> Iterable[str]:
269-
for days in range((end_date - start_date).days + 1):
270-
yield start_date.add(days=days).format(ReportStream.REPORT_DATE_FORMAT)
271+
def get_date_range(self, start_date: Date, timezone: str) -> Iterable[str]:
272+
while True:
273+
if start_date > pendulum.today(tz=timezone).date():
274+
break
275+
yield start_date.format(self.REPORT_DATE_FORMAT)
276+
start_date = start_date.add(days=1)
271277

272278
def get_start_date(self, profile: Profile, stream_state: Mapping[str, Any]) -> Date:
273279
today = pendulum.today(tz=profile.timezone).date()
274280
start_date = stream_state.get(str(profile.profileId), {}).get(self.cursor_field)
275281
if start_date:
276282
start_date = pendulum.from_format(start_date, self.REPORT_DATE_FORMAT).date()
277-
return max(start_date, today.subtract(days=60))
283+
return max(start_date, today.subtract(days=self.REPORTING_PERIOD))
278284
if self._start_date:
279-
return max(self._start_date, today.subtract(days=60))
285+
return max(self._start_date, today.subtract(days=self.REPORTING_PERIOD))
280286
return today
281287

288+
def stream_profile_slices(self, profile: Profile, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
289+
start_date = self.get_start_date(profile, stream_state)
290+
for report_date in self.get_date_range(start_date, profile.timezone):
291+
yield {"profile": profile, self.cursor_field: report_date}
292+
282293
def stream_slices(
283294
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
284295
) -> Iterable[Optional[Mapping[str, Any]]]:
285296

286297
stream_state = stream_state or {}
298+
no_data = True
299+
300+
generators = [self.stream_profile_slices(profile, stream_state) for profile in self._profiles]
301+
for _slice in iterate_one_by_one(*generators):
302+
no_data = False
303+
yield _slice
287304

288-
slices = []
289-
for profile in self._profiles:
290-
today = pendulum.today(tz=profile.timezone).date()
291-
start_date = self.get_start_date(profile, stream_state)
292-
for report_date in self.get_date_range(start_date, today):
293-
slices.append({"profile": profile, self.cursor_field: report_date})
294-
if not slices:
295-
return [None]
296-
return slices
305+
if no_data:
306+
yield None
297307

298308
def get_updated_state(self, current_stream_state: Dict[str, Any], latest_data: Mapping[str, Any]) -> Mapping[str, Any]:
299309
profileId = str(latest_data["profileId"])
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
def iterate_one_by_one(*iterables):
7+
iterables = list(iterables)
8+
while iterables:
9+
iterable = iterables.pop(0)
10+
try:
11+
yield next(iterable)
12+
except StopIteration:
13+
pass
14+
else:
15+
iterables.append(iterable)

airbyte-integrations/connectors/source-amazon-ads/unit_tests/test_report_streams.py

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import re
66
from base64 import b64decode
7+
from datetime import timedelta
8+
from functools import partial
79
from unittest import mock
810

911
import pytest
@@ -301,7 +303,7 @@ def __call__(self, request):
301303
def test_display_report_stream_slices_full_refresh(config):
302304
profiles = make_profiles()
303305
stream = SponsoredDisplayReportStream(config, profiles, authenticator=mock.MagicMock())
304-
slices = stream.stream_slices(SyncMode.full_refresh, cursor_field=stream.cursor_field)
306+
slices = list(stream.stream_slices(SyncMode.full_refresh, cursor_field=stream.cursor_field))
305307
assert slices == [{"profile": profiles[0], "reportDate": "20210729"}]
306308

307309

@@ -311,7 +313,7 @@ def test_display_report_stream_slices_incremental(config):
311313
profiles = make_profiles()
312314
stream = SponsoredDisplayReportStream(config, profiles, authenticator=mock.MagicMock())
313315
stream_state = {str(profiles[0].profileId): {"reportDate": "20210725"}}
314-
slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state)
316+
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state))
315317
assert slices == [
316318
{"profile": profiles[0], "reportDate": "20210725"},
317319
{"profile": profiles[0], "reportDate": "20210726"},
@@ -321,13 +323,13 @@ def test_display_report_stream_slices_incremental(config):
321323
]
322324

323325
stream_state = {str(profiles[0].profileId): {"reportDate": "20210730"}}
324-
slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state)
326+
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state))
325327
assert slices == [None]
326328

327-
slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={})
329+
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={}))
328330
assert slices == [{"profile": profiles[0], "reportDate": "20210729"}]
329331

330-
slices = stream.stream_slices(SyncMode.incremental, cursor_field=None, stream_state={})
332+
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=None, stream_state={}))
331333
assert slices == [{"profile": profiles[0], "reportDate": "20210729"}]
332334

333335

@@ -358,5 +360,56 @@ def test_stream_slices_different_timezones(config):
358360
profile1 = Profile(profileId=1, timezone="America/Los_Angeles", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))
359361
profile2 = Profile(profileId=2, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))
360362
stream = SponsoredProductsReportStream(config, [profile1, profile2], authenticator=mock.MagicMock())
361-
slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={})
363+
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={}))
362364
assert slices == [{"profile": profile1, "reportDate": "20210731"}, {"profile": profile2, "reportDate": "20210801"}]
365+
366+
367+
def test_stream_slices_lazy_evaluation(config):
368+
with freeze_time("2022-06-01T23:50:00+00:00") as frozen_datetime:
369+
config["start_date"] = "2021-05-10"
370+
profile1 = Profile(profileId=1, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))
371+
profile2 = Profile(profileId=2, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))
372+
373+
stream = SponsoredProductsReportStream(config, [profile1, profile2], authenticator=mock.MagicMock())
374+
stream.REPORTING_PERIOD = 5
375+
376+
slices = []
377+
for _slice in stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field):
378+
slices.append(_slice)
379+
frozen_datetime.tick(delta=timedelta(minutes=10))
380+
381+
assert slices == [
382+
{"profile": profile1, "reportDate": "20220527"},
383+
{"profile": profile2, "reportDate": "20220528"},
384+
{"profile": profile1, "reportDate": "20220528"},
385+
{"profile": profile2, "reportDate": "20220529"},
386+
{"profile": profile1, "reportDate": "20220529"},
387+
{"profile": profile2, "reportDate": "20220530"},
388+
{"profile": profile1, "reportDate": "20220530"},
389+
{"profile": profile2, "reportDate": "20220531"},
390+
{"profile": profile1, "reportDate": "20220531"},
391+
{"profile": profile2, "reportDate": "20220601"},
392+
{"profile": profile1, "reportDate": "20220601"},
393+
{"profile": profile2, "reportDate": "20220602"},
394+
{"profile": profile1, "reportDate": "20220602"},
395+
]
396+
397+
398+
def test_get_date_range_lazy_evaluation():
399+
get_date_range = partial(SponsoredProductsReportStream.get_date_range, SponsoredProductsReportStream)
400+
401+
with freeze_time("2022-06-01T12:00:00+00:00") as frozen_datetime:
402+
date_range = list(get_date_range(start_date=Date(2022, 5, 29), timezone="UTC"))
403+
assert date_range == ["20220529", "20220530", "20220531", "20220601"]
404+
405+
date_range = list(get_date_range(start_date=Date(2022, 6, 1), timezone="UTC"))
406+
assert date_range == ["20220601"]
407+
408+
date_range = list(get_date_range(start_date=Date(2022, 6, 2), timezone="UTC"))
409+
assert date_range == []
410+
411+
date_range = []
412+
for date in get_date_range(start_date=Date(2022, 5, 29), timezone="UTC"):
413+
date_range.append(date)
414+
frozen_datetime.tick(delta=timedelta(hours=3))
415+
assert date_range == ["20220529", "20220530", "20220531", "20220601", "20220602"]

docs/integrations/sources/amazon-ads.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ Information about expected report generation waiting time you may find [here](ht
9090

9191
| Version | Date | Pull Request | Subject |
9292
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------|
93+
| 0.1.14 | 2022-08-15 | [15637](https://github.com/airbytehq/airbyte/pull/15637) | Generate slices by lazy evaluation |
9394
| 0.1.12 | 2022-08-09 | [15469](https://github.com/airbytehq/airbyte/pull/15469) | Define primary_key for all report streams |
9495
| 0.1.11 | 2022-07-28 | [15031](https://github.com/airbytehq/airbyte/pull/15031) | Improve report streams date-range generation |
9596
| 0.1.10 | 2022-07-26 | [15042](https://github.com/airbytehq/airbyte/pull/15042) | Update `additionalProperties` field to true from schemas |

0 commit comments

Comments
 (0)