diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 2a85be90bf6ad..3f80ef0ddbb43 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -240,7 +240,7 @@ - name: Facebook Marketing sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.48 + dockerImageTag: 0.2.49 documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing icon: facebook.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e759d939df731..b9903cc909926 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -1781,7 +1781,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-facebook-marketing:0.2.48" +- dockerImage: "airbyte/source-facebook-marketing:0.2.49" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 4f2bc54593430..6568229505083 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.48 +LABEL io.airbyte.version=0.2.49 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py index 0aa2378760889..b0a07c153b41a 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py @@ -102,9 +102,20 @@ def read_records( stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: """Waits for current job to finish (slice) and yield its result""" + + today = pendulum.today(tz="UTC").date() + date_start = stream_state and stream_state.get("date_start") + if date_start: + date_start = pendulum.parse(date_start).date() + job = stream_slice["insight_job"] for obj in job.get_result(): - yield obj.export_all_data() + record = obj.export_all_data() + if date_start: + updated_time = pendulum.parse(record["updated_time"]).date() + if updated_time <= date_start or updated_time >= today: + continue + yield record self._completed_slices.add(job.interval.start) if job.interval.start == self._next_cursor_value: @@ -152,9 +163,11 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late def _date_intervals(self) -> Iterator[pendulum.Date]: """Get date period to sync""" - if self._end_date < self._next_cursor_value: + yesterday = pendulum.yesterday(tz="UTC").date() + end_date = min(self._end_date, yesterday) + if end_date < self._next_cursor_value: return - date_range = self._end_date - self._next_cursor_value + date_range = end_date - self._next_cursor_value yield from date_range.range("days", self.time_increment) def _advance_cursor(self): @@ -173,9 +186,14 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]: :return: """ + today = pendulum.today(tz="UTC").date() + refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD + for ts_start in self._date_intervals(): if ts_start in self._completed_slices: - continue + if ts_start < refresh_date: + continue + self._completed_slices.remove(ts_start) ts_end = ts_start + pendulum.duration(days=self.time_increment - 1) interval = pendulum.Period(ts_start, ts_end) yield InsightAsyncJob(api=self._api.api, edge_object=self._api.account, interval=interval, params=params) @@ -215,7 +233,7 @@ def _get_start_date(self) -> pendulum.Date: :return: the first date to sync """ - today = pendulum.today().date() + today = pendulum.today(tz="UTC").date() oldest_date = today - self.INSIGHTS_RETENTION_PERIOD refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py index 6f9e1bd563dbe..472a7ff7c2514 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py @@ -54,3 +54,15 @@ def api_fixture(some_config, requests_mock, fb_account_response): requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/me/adaccounts", [fb_account_response]) requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{some_config['account_id']}/", [fb_account_response]) return api + + +@fixture +def set_today(mocker, monkeypatch): + def inner(date: str): + today = pendulum.parse(date) + yesterday = today - pendulum.duration(days=1) + monkeypatch.setattr(pendulum, "today", mocker.MagicMock(return_value=today)) + monkeypatch.setattr(pendulum, "yesterday", mocker.MagicMock(return_value=yesterday)) + return yesterday, today + + return inner diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/helpers.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/helpers.py new file mode 100644 index 0000000000000..4d9098f381c13 --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/helpers.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from typing import Any, MutableMapping + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams import Stream + + +def read_full_refresh(stream_instance: Stream): + records = [] + slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh) + for slice in slices: + records.extend(list(stream_instance.read_records(stream_slice=slice, sync_mode=SyncMode.full_refresh))) + return records + + +def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]): + records = [] + stream_instance.state = stream_state + slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state) + for slice in slices: + records.extend(list(stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state))) + stream_state.clear() + stream_state.update(stream_instance.state) + return records + + +class FakeInsightAsyncJobManager: + def __init__(self, jobs, **kwargs): + self.jobs = jobs + + def completed_jobs(self): + yield from self.jobs + + +class FakeInsightAsyncJob: + updated_insights = {} + + @classmethod + def update_insight(cls, date_start, updated_time): + cls.updated_insights[date_start] = updated_time + + def __init__(self, interval, **kwargs): + self.interval = interval + + def get_result(self): + return [self] + + def export_all_data(self): + date_start = str(self.interval.start) + return {"date_start": date_start, "updated_time": self.updated_insights.get(date_start, date_start)} diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_insight_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_insight_streams.py index 4ba21fc0d3417..ffc1377632f38 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_insight_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_insight_streams.py @@ -6,7 +6,9 @@ import pendulum import pytest +import source_facebook_marketing.streams.base_insight_streams from airbyte_cdk.models import SyncMode +from helpers import FakeInsightAsyncJob, FakeInsightAsyncJobManager, read_full_refresh, read_incremental from pendulum import duration from source_facebook_marketing.streams import AdsInsights from source_facebook_marketing.streams.async_job import AsyncJob, InsightAsyncJob @@ -186,7 +188,7 @@ def test_stream_slices_no_state_close_to_now(self, api, async_manager_mock, rece async_manager_mock.assert_called_once() args, kwargs = async_manager_mock.call_args generated_jobs = list(kwargs["jobs"]) - assert len(generated_jobs) == (end_date - start_date).days + 1 + assert len(generated_jobs) == (end_date - start_date).days assert generated_jobs[0].interval.start == start_date.date() assert generated_jobs[1].interval.start == start_date.date() + duration(days=1) @@ -223,7 +225,7 @@ def test_stream_slices_with_state_close_to_now(self, api, async_manager_mock, re async_manager_mock.assert_called_once() args, kwargs = async_manager_mock.call_args generated_jobs = list(kwargs["jobs"]) - assert len(generated_jobs) == (end_date - start_date).days + 1 + assert len(generated_jobs) == (end_date - start_date).days assert generated_jobs[0].interval.start == start_date.date() assert generated_jobs[1].interval.start == start_date.date() + duration(days=1) @@ -290,3 +292,72 @@ def test_fields_custom(self, api): ) assert stream.fields == ["account_id", "account_currency"] + + def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today): + start_date = pendulum.parse("2020-03-01") + end_date = pendulum.parse("2020-05-01") + set_today("2020-04-01") + monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=10)) + monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob) + monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager) + + state = { + AdsInsights.cursor_field: "2020-03-19", + "slices": [ + "2020-03-21", + "2020-03-22", + "2020-03-23", + ], + "time_increment": 1, + } + + stream = AdsInsights(api=api, start_date=start_date, end_date=end_date) + stream.state = state + assert stream._completed_slices == {pendulum.Date(2020, 3, 21), pendulum.Date(2020, 3, 22), pendulum.Date(2020, 3, 23)} + + slices = stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental) + slices = [x["insight_job"].interval.start for x in slices] + + assert pendulum.parse("2020-03-21").date() not in slices + assert pendulum.parse("2020-03-22").date() in slices + assert pendulum.parse("2020-03-23").date() in slices + assert stream._completed_slices == {pendulum.Date(2020, 3, 21)} + + def test_incremental_lookback_period_updated(self, api, monkeypatch, set_today): + start_date = pendulum.parse("2020-03-01") + end_date = pendulum.parse("2020-05-01") + yesterday, _ = set_today("2020-04-01") + monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=20)) + monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob) + monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager) + + stream = AdsInsights(api=api, start_date=start_date, end_date=end_date) + + records = read_full_refresh(stream) + assert len(records) == (yesterday - start_date).days + 1 + assert records[0]["date_start"] == str(start_date.date()) + assert records[-1]["date_start"] == str(yesterday.date()) + + state = {AdsInsights.cursor_field: "2020-03-20", "time_increment": 1} + records = read_incremental(stream, state) + assert len(records) == (yesterday - pendulum.parse("2020-03-20")).days + assert records[0]["date_start"] == "2020-03-21" + assert records[-1]["date_start"] == str(yesterday.date()) + assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1} + + yesterday, _ = set_today("2020-04-02") + records = read_incremental(stream, state) + assert records == [{"date_start": str(yesterday.date()), "updated_time": str(yesterday.date())}] + assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1} + + yesterday, _ = set_today("2020-04-03") + FakeInsightAsyncJob.update_insight("2020-03-26", "2020-04-01") + FakeInsightAsyncJob.update_insight("2020-03-27", "2020-04-02") + FakeInsightAsyncJob.update_insight("2020-03-28", "2020-04-03") + + records = read_incremental(stream, state) + assert records == [ + {"date_start": "2020-03-27", "updated_time": "2020-04-02"}, + {"date_start": "2020-04-02", "updated_time": "2020-04-02"}, + ] + assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1} diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index fcd744dafd05d..0f0552bc1adb9 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -108,6 +108,7 @@ For more information, see the [Facebook Insights API documentation.](https://dev | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.2.49 | 2022-05-20 | [13047](https://github.com/airbytehq/airbyte/pull/13047) | Fix duplicating records during insights lookback period | | 0.2.48 | 2022-05-19 | [13008](https://github.com/airbytehq/airbyte/pull/13008) | Update CDK to v0.1.58 avoid crashing on incorrect stream schemas | | 0.2.47 | 2022-05-06 | [12685](https://github.com/airbytehq/airbyte/pull/12685) | Update CDK to v0.1.56 to emit an `AirbyeTraceMessage` on uncaught exceptions | | 0.2.46 | 2022-04-22 | [12171](https://github.com/airbytehq/airbyte/pull/12171) | Allow configuration of page_size for requests |