Skip to content

Commit 4c283d7

Browse files
authored
Source Facebook marketing: Fix duplicating records during insights lookback period (#13047)
Signed-off-by: Sergey Chvalyuk <[email protected]>
1 parent e82e6fc commit 4c283d7

File tree

8 files changed

+166
-10
lines changed

8 files changed

+166
-10
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@
240240
- name: Facebook Marketing
241241
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
242242
dockerRepository: airbyte/source-facebook-marketing
243-
dockerImageTag: 0.2.48
243+
dockerImageTag: 0.2.49
244244
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
245245
icon: facebook.svg
246246
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1771,7 +1771,7 @@
17711771
supportsNormalization: false
17721772
supportsDBT: false
17731773
supported_destination_sync_modes: []
1774-
- dockerImage: "airbyte/source-facebook-marketing:0.2.48"
1774+
- dockerImage: "airbyte/source-facebook-marketing:0.2.49"
17751775
spec:
17761776
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
17771777
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"

airbyte-integrations/connectors/source-facebook-marketing/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

1515

16-
LABEL io.airbyte.version=0.2.48
16+
LABEL io.airbyte.version=0.2.49
1717
LABEL io.airbyte.name=airbyte/source-facebook-marketing

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py

+23-5
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,20 @@ def read_records(
102102
stream_state: Mapping[str, Any] = None,
103103
) -> Iterable[Mapping[str, Any]]:
104104
"""Waits for current job to finish (slice) and yield its result"""
105+
106+
today = pendulum.today(tz="UTC").date()
107+
date_start = stream_state and stream_state.get("date_start")
108+
if date_start:
109+
date_start = pendulum.parse(date_start).date()
110+
105111
job = stream_slice["insight_job"]
106112
for obj in job.get_result():
107-
yield obj.export_all_data()
113+
record = obj.export_all_data()
114+
if date_start:
115+
updated_time = pendulum.parse(record["updated_time"]).date()
116+
if updated_time <= date_start or updated_time >= today:
117+
continue
118+
yield record
108119

109120
self._completed_slices.add(job.interval.start)
110121
if job.interval.start == self._next_cursor_value:
@@ -152,9 +163,11 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
152163

153164
def _date_intervals(self) -> Iterator[pendulum.Date]:
154165
"""Get date period to sync"""
155-
if self._end_date < self._next_cursor_value:
166+
yesterday = pendulum.yesterday(tz="UTC").date()
167+
end_date = min(self._end_date, yesterday)
168+
if end_date < self._next_cursor_value:
156169
return
157-
date_range = self._end_date - self._next_cursor_value
170+
date_range = end_date - self._next_cursor_value
158171
yield from date_range.range("days", self.time_increment)
159172

160173
def _advance_cursor(self):
@@ -173,9 +186,14 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]:
173186
:return:
174187
"""
175188

189+
today = pendulum.today(tz="UTC").date()
190+
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD
191+
176192
for ts_start in self._date_intervals():
177193
if ts_start in self._completed_slices:
178-
continue
194+
if ts_start < refresh_date:
195+
continue
196+
self._completed_slices.remove(ts_start)
179197
ts_end = ts_start + pendulum.duration(days=self.time_increment - 1)
180198
interval = pendulum.Period(ts_start, ts_end)
181199
yield InsightAsyncJob(api=self._api.api, edge_object=self._api.account, interval=interval, params=params)
@@ -215,7 +233,7 @@ def _get_start_date(self) -> pendulum.Date:
215233
216234
:return: the first date to sync
217235
"""
218-
today = pendulum.today().date()
236+
today = pendulum.today(tz="UTC").date()
219237
oldest_date = today - self.INSIGHTS_RETENTION_PERIOD
220238
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD
221239

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py

+12
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,15 @@ def api_fixture(some_config, requests_mock, fb_account_response):
5454
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/me/adaccounts", [fb_account_response])
5555
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{some_config['account_id']}/", [fb_account_response])
5656
return api
57+
58+
59+
@fixture
60+
def set_today(mocker, monkeypatch):
61+
def inner(date: str):
62+
today = pendulum.parse(date)
63+
yesterday = today - pendulum.duration(days=1)
64+
monkeypatch.setattr(pendulum, "today", mocker.MagicMock(return_value=today))
65+
monkeypatch.setattr(pendulum, "yesterday", mocker.MagicMock(return_value=yesterday))
66+
return yesterday, today
67+
68+
return inner
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#
2+
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
from typing import Any, MutableMapping
7+
8+
from airbyte_cdk.models import SyncMode
9+
from airbyte_cdk.sources.streams import Stream
10+
11+
12+
def read_full_refresh(stream_instance: Stream):
13+
records = []
14+
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
15+
for slice in slices:
16+
records.extend(list(stream_instance.read_records(stream_slice=slice, sync_mode=SyncMode.full_refresh)))
17+
return records
18+
19+
20+
def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]):
21+
records = []
22+
stream_instance.state = stream_state
23+
slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state)
24+
for slice in slices:
25+
records.extend(list(stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state)))
26+
stream_state.clear()
27+
stream_state.update(stream_instance.state)
28+
return records
29+
30+
31+
class FakeInsightAsyncJobManager:
32+
def __init__(self, jobs, **kwargs):
33+
self.jobs = jobs
34+
35+
def completed_jobs(self):
36+
yield from self.jobs
37+
38+
39+
class FakeInsightAsyncJob:
40+
updated_insights = {}
41+
42+
@classmethod
43+
def update_insight(cls, date_start, updated_time):
44+
cls.updated_insights[date_start] = updated_time
45+
46+
def __init__(self, interval, **kwargs):
47+
self.interval = interval
48+
49+
def get_result(self):
50+
return [self]
51+
52+
def export_all_data(self):
53+
date_start = str(self.interval.start)
54+
return {"date_start": date_start, "updated_time": self.updated_insights.get(date_start, date_start)}

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_insight_streams.py

+73-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
import pendulum
88
import pytest
9+
import source_facebook_marketing.streams.base_insight_streams
910
from airbyte_cdk.models import SyncMode
11+
from helpers import FakeInsightAsyncJob, FakeInsightAsyncJobManager, read_full_refresh, read_incremental
1012
from pendulum import duration
1113
from source_facebook_marketing.streams import AdsInsights
1214
from source_facebook_marketing.streams.async_job import AsyncJob, InsightAsyncJob
@@ -186,7 +188,7 @@ def test_stream_slices_no_state_close_to_now(self, api, async_manager_mock, rece
186188
async_manager_mock.assert_called_once()
187189
args, kwargs = async_manager_mock.call_args
188190
generated_jobs = list(kwargs["jobs"])
189-
assert len(generated_jobs) == (end_date - start_date).days + 1
191+
assert len(generated_jobs) == (end_date - start_date).days
190192
assert generated_jobs[0].interval.start == start_date.date()
191193
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)
192194

@@ -223,7 +225,7 @@ def test_stream_slices_with_state_close_to_now(self, api, async_manager_mock, re
223225
async_manager_mock.assert_called_once()
224226
args, kwargs = async_manager_mock.call_args
225227
generated_jobs = list(kwargs["jobs"])
226-
assert len(generated_jobs) == (end_date - start_date).days + 1
228+
assert len(generated_jobs) == (end_date - start_date).days
227229
assert generated_jobs[0].interval.start == start_date.date()
228230
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)
229231

@@ -290,3 +292,72 @@ def test_fields_custom(self, api):
290292
)
291293

292294
assert stream.fields == ["account_id", "account_currency"]
295+
296+
def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today):
297+
start_date = pendulum.parse("2020-03-01")
298+
end_date = pendulum.parse("2020-05-01")
299+
set_today("2020-04-01")
300+
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=10))
301+
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
302+
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)
303+
304+
state = {
305+
AdsInsights.cursor_field: "2020-03-19",
306+
"slices": [
307+
"2020-03-21",
308+
"2020-03-22",
309+
"2020-03-23",
310+
],
311+
"time_increment": 1,
312+
}
313+
314+
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
315+
stream.state = state
316+
assert stream._completed_slices == {pendulum.Date(2020, 3, 21), pendulum.Date(2020, 3, 22), pendulum.Date(2020, 3, 23)}
317+
318+
slices = stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental)
319+
slices = [x["insight_job"].interval.start for x in slices]
320+
321+
assert pendulum.parse("2020-03-21").date() not in slices
322+
assert pendulum.parse("2020-03-22").date() in slices
323+
assert pendulum.parse("2020-03-23").date() in slices
324+
assert stream._completed_slices == {pendulum.Date(2020, 3, 21)}
325+
326+
def test_incremental_lookback_period_updated(self, api, monkeypatch, set_today):
327+
start_date = pendulum.parse("2020-03-01")
328+
end_date = pendulum.parse("2020-05-01")
329+
yesterday, _ = set_today("2020-04-01")
330+
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=20))
331+
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
332+
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)
333+
334+
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
335+
336+
records = read_full_refresh(stream)
337+
assert len(records) == (yesterday - start_date).days + 1
338+
assert records[0]["date_start"] == str(start_date.date())
339+
assert records[-1]["date_start"] == str(yesterday.date())
340+
341+
state = {AdsInsights.cursor_field: "2020-03-20", "time_increment": 1}
342+
records = read_incremental(stream, state)
343+
assert len(records) == (yesterday - pendulum.parse("2020-03-20")).days
344+
assert records[0]["date_start"] == "2020-03-21"
345+
assert records[-1]["date_start"] == str(yesterday.date())
346+
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}
347+
348+
yesterday, _ = set_today("2020-04-02")
349+
records = read_incremental(stream, state)
350+
assert records == [{"date_start": str(yesterday.date()), "updated_time": str(yesterday.date())}]
351+
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}
352+
353+
yesterday, _ = set_today("2020-04-03")
354+
FakeInsightAsyncJob.update_insight("2020-03-26", "2020-04-01")
355+
FakeInsightAsyncJob.update_insight("2020-03-27", "2020-04-02")
356+
FakeInsightAsyncJob.update_insight("2020-03-28", "2020-04-03")
357+
358+
records = read_incremental(stream, state)
359+
assert records == [
360+
{"date_start": "2020-03-27", "updated_time": "2020-04-02"},
361+
{"date_start": "2020-04-02", "updated_time": "2020-04-02"},
362+
]
363+
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}

docs/integrations/sources/facebook-marketing.md

+1
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ For more information, see the [Facebook Insights API documentation.](https://dev
108108

109109
| Version | Date | Pull Request | Subject |
110110
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
111+
| 0.2.49 | 2022-05-20 | [13047](https://github.com/airbytehq/airbyte/pull/13047) | Fix duplicating records during insights lookback period |
111112
| 0.2.48 | 2022-05-19 | [13008](https://github.com/airbytehq/airbyte/pull/13008) | Update CDK to v0.1.58 avoid crashing on incorrect stream schemas |
112113
| 0.2.47 | 2022-05-06 | [12685](https://github.com/airbytehq/airbyte/pull/12685) | Update CDK to v0.1.56 to emit an `AirbyeTraceMessage` on uncaught exceptions |
113114
| 0.2.46 | 2022-04-22 | [12171](https://github.com/airbytehq/airbyte/pull/12171) | Allow configuration of page_size for requests |

0 commit comments

Comments
 (0)