Skip to content

Commit 76032e6

Browse files
Source Facebook Marketing: Add lookback window to insights streams (#12402)
1 parent c55f185 commit 76032e6

File tree

9 files changed

+95
-36
lines changed

9 files changed

+95
-36
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@
240240
- name: Facebook Marketing
241241
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
242242
dockerRepository: airbyte/source-facebook-marketing
243-
dockerImageTag: 0.2.49
243+
dockerImageTag: 0.2.50
244244
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
245245
icon: facebook.svg
246246
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+18-1
Original file line numberDiff line numberDiff line change
@@ -1838,7 +1838,7 @@
18381838
supportsNormalization: false
18391839
supportsDBT: false
18401840
supported_destination_sync_modes: []
1841-
- dockerImage: "airbyte/source-facebook-marketing:0.2.49"
1841+
- dockerImage: "airbyte/source-facebook-marketing:0.2.50"
18421842
spec:
18431843
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
18441844
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
@@ -2141,6 +2141,14 @@
21412141
- "2017-01-26T00:00:00Z"
21422142
type: "string"
21432143
format: "date-time"
2144+
insights_lookback_window:
2145+
title: "Custom Insights Lookback Window"
2146+
description: "The attribution window"
2147+
default: 28
2148+
maximum: 28
2149+
mininum: 1
2150+
exclusiveMinimum: 0
2151+
type: "integer"
21442152
required:
21452153
- "name"
21462154
page_size:
@@ -2153,6 +2161,15 @@
21532161
order: 7
21542162
exclusiveMinimum: 0
21552163
type: "integer"
2164+
insights_lookback_window:
2165+
title: "Insights Lookback Window"
2166+
description: "The attribution window"
2167+
default: 28
2168+
order: 8
2169+
maximum: 28
2170+
mininum: 1
2171+
exclusiveMinimum: 0
2172+
type: "integer"
21562173
required:
21572174
- "account_id"
21582175
- "start_date"

airbyte-integrations/connectors/source-facebook-marketing/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

1515

16-
LABEL io.airbyte.version=0.2.49
16+
LABEL io.airbyte.version=0.2.50
1717
LABEL io.airbyte.name=airbyte/source-facebook-marketing

airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json

+19
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,15 @@
289289
"examples": ["2017-01-26T00:00:00Z"],
290290
"type": "string",
291291
"format": "date-time"
292+
},
293+
"insights_lookback_window": {
294+
"title": "Custom Insights Lookback Window",
295+
"description": "The attribution window",
296+
"default": 28,
297+
"maximum": 28,
298+
"mininum": 1,
299+
"exclusiveMinimum": 0,
300+
"type": "integer"
292301
}
293302
},
294303
"required": ["name"]
@@ -301,6 +310,16 @@
301310
"order": 7,
302311
"exclusiveMinimum": 0,
303312
"type": "integer"
313+
},
314+
"insights_lookback_window": {
315+
"title": "Insights Lookback Window",
316+
"description": "The attribution window",
317+
"default": 28,
318+
"order": 8,
319+
"maximum": 28,
320+
"mininum": 1,
321+
"exclusiveMinimum": 0,
322+
"type": "integer"
304323
}
305324
},
306325
"required": ["account_id", "start_date", "access_token"]

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
6161
api = API(account_id=config.account_id, access_token=config.access_token)
6262

6363
insights_args = dict(
64-
api=api,
65-
start_date=config.start_date,
66-
end_date=config.end_date,
64+
api=api, start_date=config.start_date, end_date=config.end_date, insights_lookback_window=config.insights_lookback_window
6765
)
6866
streams = [
6967
AdAccount(api=api),
@@ -159,6 +157,7 @@ def _update_insights_streams(self, insights: List[InsightConfig], default_args,
159157
time_increment=insight.time_increment,
160158
start_date=insight.start_date or default_args["start_date"],
161159
end_date=insight.end_date or default_args["end_date"],
160+
insights_lookback_window=insight.insights_lookback_window or default_args["insights_lookback_window"],
162161
)
163162
insight_stream = AdsInsights(**args)
164163
insights_custom_streams.append(insight_stream)

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py

+16
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ class Config:
7777
pattern=DATE_TIME_PATTERN,
7878
examples=["2017-01-26T00:00:00Z"],
7979
)
80+
insights_lookback_window: Optional[PositiveInt] = Field(
81+
title="Custom Insights Lookback Window",
82+
description="The attribution window",
83+
maximum=28,
84+
mininum=1,
85+
default=28,
86+
)
8087

8188

8289
class ConnectorConfig(BaseConfig):
@@ -156,3 +163,12 @@ class Config:
156163
"Page size used when sending requests to Facebook API to specify number of records per page when response has pagination. Most users do not need to set this field unless they specifically need to tune the connector to address specific issues or use cases."
157164
),
158165
)
166+
167+
insights_lookback_window: Optional[PositiveInt] = Field(
168+
title="Insights Lookback Window",
169+
order=8,
170+
description="The attribution window",
171+
maximum=28,
172+
mininum=1,
173+
default=28,
174+
)

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py

+15-8
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@ class AdsInsights(FBMarketingIncrementalStream):
4646
# HTTP response.
4747
# https://developers.facebook.com/docs/marketing-api/reference/ad-account/insights/#overview
4848
INSIGHTS_RETENTION_PERIOD = pendulum.duration(months=37)
49-
# Facebook freezes insight data 28 days after it was generated, which means that all data
50-
# from the past 28 days may have changed since we last emitted it, so we retrieve it again.
51-
INSIGHTS_LOOKBACK_PERIOD = pendulum.duration(days=28)
5249

5350
action_breakdowns = ALL_ACTION_BREAKDOWNS
5451
level = "ad"
@@ -64,6 +61,7 @@ def __init__(
6461
breakdowns: List[str] = None,
6562
action_breakdowns: List[str] = None,
6663
time_increment: Optional[int] = None,
64+
insights_lookback_window: int = None,
6765
**kwargs,
6866
):
6967
super().__init__(**kwargs)
@@ -74,6 +72,7 @@ def __init__(
7472
self.breakdowns = breakdowns or self.breakdowns
7573
self.time_increment = time_increment or self.time_increment
7674
self._new_class_name = name
75+
self._insights_lookback_window = insights_lookback_window
7776

7877
# state
7978
self._cursor_value: Optional[pendulum.Date] = None # latest period that was read
@@ -91,6 +90,16 @@ def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
9190
"""Build complex PK based on slices and breakdowns"""
9291
return ["date_start", "account_id", "ad_id"] + self.breakdowns
9392

93+
@property
94+
def insights_lookback_period(self):
95+
"""
96+
Facebook freezes insight data 28 days after it was generated, which means that all data
97+
from the past 28 days may have changed since we last emitted it, so we retrieve it again.
98+
But in some cases users my have define their own lookback window, thats
99+
why the value for `insights_lookback_window` is set throught config.
100+
"""
101+
return pendulum.duration(days=self._insights_lookback_window)
102+
94103
def list_objects(self, params: Mapping[str, Any]) -> Iterable:
95104
"""Because insights has very different read_records we don't need this method anymore"""
96105

@@ -187,7 +196,7 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]:
187196
"""
188197

189198
today = pendulum.today(tz="UTC").date()
190-
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD
199+
refresh_date = today - self.insights_lookback_period
191200

192201
for ts_start in self._date_intervals():
193202
if ts_start in self._completed_slices:
@@ -235,13 +244,12 @@ def _get_start_date(self) -> pendulum.Date:
235244
"""
236245
today = pendulum.today(tz="UTC").date()
237246
oldest_date = today - self.INSIGHTS_RETENTION_PERIOD
238-
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD
239-
247+
refresh_date = today - self.insights_lookback_period
240248
if self._cursor_value:
241249
start_date = self._cursor_value + pendulum.duration(days=self.time_increment)
242250
if start_date > refresh_date:
243251
logger.info(
244-
f"The cursor value within refresh period ({self.INSIGHTS_LOOKBACK_PERIOD}), start sync from {refresh_date} instead."
252+
f"The cursor value within refresh period ({self.insights_lookback_period}), start sync from {refresh_date} instead."
245253
)
246254
start_date = min(start_date, refresh_date)
247255

@@ -252,7 +260,6 @@ def _get_start_date(self) -> pendulum.Date:
252260
start_date = self._start_date
253261
if start_date < oldest_date:
254262
logger.warning(f"Loading insights older then {self.INSIGHTS_RETENTION_PERIOD} is not possible. Start sync from {oldest_date}.")
255-
256263
return max(oldest_date, start_date)
257264

258265
def request_params(self, **kwargs) -> MutableMapping[str, Any]:

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_insight_streams.py

+22-22
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def async_job_mock_fixture(mocker):
5151

5252
class TestBaseInsightsStream:
5353
def test_init(self, api):
54-
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1))
54+
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)
5555

5656
assert not stream.breakdowns
5757
assert stream.action_breakdowns == AdsInsights.ALL_ACTION_BREAKDOWNS
@@ -66,6 +66,7 @@ def test_init_override(self, api):
6666
name="CustomName",
6767
breakdowns=["test1", "test2"],
6868
action_breakdowns=["field1", "field2"],
69+
insights_lookback_window=28,
6970
)
7071

7172
assert stream.breakdowns == ["test1", "test2"]
@@ -85,6 +86,7 @@ def test_read_records_all(self, mocker, api):
8586
api=api,
8687
start_date=datetime(2010, 1, 1),
8788
end_date=datetime(2011, 1, 1),
89+
insights_lookback_window=28,
8890
)
8991

9092
records = list(
@@ -104,11 +106,7 @@ def test_read_records_random_order(self, mocker, api):
104106
job = mocker.Mock(spec=AsyncJob)
105107
job.get_result.return_value = [mocker.Mock(), mocker.Mock(), mocker.Mock()]
106108
job.interval = pendulum.Period(pendulum.date(2010, 1, 1), pendulum.date(2010, 1, 1))
107-
stream = AdsInsights(
108-
api=api,
109-
start_date=datetime(2010, 1, 1),
110-
end_date=datetime(2011, 1, 1),
111-
)
109+
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)
112110

113111
records = list(
114112
stream.read_records(
@@ -143,11 +141,7 @@ def test_read_records_random_order(self, mocker, api):
143141
)
144142
def test_state(self, api, state):
145143
"""State setter/getter should work with all combinations"""
146-
stream = AdsInsights(
147-
api=api,
148-
start_date=datetime(2010, 1, 1),
149-
end_date=datetime(2011, 1, 1),
150-
)
144+
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)
151145

152146
assert stream.state == {}
153147

@@ -162,7 +156,7 @@ def test_state(self, api, state):
162156
def test_stream_slices_no_state(self, api, async_manager_mock, start_date):
163157
"""Stream will use start_date when there is not state"""
164158
end_date = start_date + duration(weeks=2)
165-
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
159+
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
166160
async_manager_mock.completed_jobs.return_value = [1, 2, 3]
167161

168162
slices = list(stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental))
@@ -179,7 +173,7 @@ def test_stream_slices_no_state_close_to_now(self, api, async_manager_mock, rece
179173
"""Stream will use start_date when there is not state and start_date within 28d from now"""
180174
start_date = recent_start_date
181175
end_date = pendulum.now()
182-
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
176+
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
183177
async_manager_mock.completed_jobs.return_value = [1, 2, 3]
184178

185179
slices = list(stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental))
@@ -197,7 +191,7 @@ def test_stream_slices_with_state(self, api, async_manager_mock, start_date):
197191
end_date = start_date + duration(days=10)
198192
cursor_value = start_date + duration(days=5)
199193
state = {AdsInsights.cursor_field: cursor_value.date().isoformat()}
200-
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
194+
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
201195
async_manager_mock.completed_jobs.return_value = [1, 2, 3]
202196

203197
slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
@@ -216,7 +210,7 @@ def test_stream_slices_with_state_close_to_now(self, api, async_manager_mock, re
216210
end_date = pendulum.now()
217211
cursor_value = end_date - duration(days=1)
218212
state = {AdsInsights.cursor_field: cursor_value.date().isoformat()}
219-
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
213+
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
220214
async_manager_mock.completed_jobs.return_value = [1, 2, 3]
221215

222216
slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
@@ -237,7 +231,7 @@ def test_stream_slices_with_state_and_slices(self, api, async_manager_mock, star
237231
AdsInsights.cursor_field: cursor_value.date().isoformat(),
238232
"slices": [(cursor_value + duration(days=1)).date().isoformat(), (cursor_value + duration(days=3)).date().isoformat()],
239233
}
240-
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
234+
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
241235
async_manager_mock.completed_jobs.return_value = [1, 2, 3]
242236

243237
slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
@@ -251,7 +245,7 @@ def test_stream_slices_with_state_and_slices(self, api, async_manager_mock, star
251245
assert generated_jobs[1].interval.start == cursor_value.date() + duration(days=4)
252246

253247
def test_get_json_schema(self, api):
254-
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1))
248+
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)
255249

256250
schema = stream.get_json_schema()
257251

@@ -261,7 +255,11 @@ def test_get_json_schema(self, api):
261255

262256
def test_get_json_schema_custom(self, api):
263257
stream = AdsInsights(
264-
api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), breakdowns=["device_platform", "country"]
258+
api=api,
259+
start_date=datetime(2010, 1, 1),
260+
end_date=datetime(2011, 1, 1),
261+
breakdowns=["device_platform", "country"],
262+
insights_lookback_window=28,
265263
)
266264

267265
schema = stream.get_json_schema()
@@ -275,6 +273,7 @@ def test_fields(self, api):
275273
api=api,
276274
start_date=datetime(2010, 1, 1),
277275
end_date=datetime(2011, 1, 1),
276+
insights_lookback_window=28,
278277
)
279278

280279
fields = stream.fields
@@ -289,6 +288,7 @@ def test_fields_custom(self, api):
289288
start_date=datetime(2010, 1, 1),
290289
end_date=datetime(2011, 1, 1),
291290
fields=["account_id", "account_currency"],
291+
insights_lookback_window=28,
292292
)
293293

294294
assert stream.fields == ["account_id", "account_currency"]
@@ -297,7 +297,7 @@ def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today):
297297
start_date = pendulum.parse("2020-03-01")
298298
end_date = pendulum.parse("2020-05-01")
299299
set_today("2020-04-01")
300-
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=10))
300+
301301
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
302302
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)
303303

@@ -311,7 +311,7 @@ def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today):
311311
"time_increment": 1,
312312
}
313313

314-
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
314+
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=10)
315315
stream.state = state
316316
assert stream._completed_slices == {pendulum.Date(2020, 3, 21), pendulum.Date(2020, 3, 22), pendulum.Date(2020, 3, 23)}
317317

@@ -327,11 +327,11 @@ def test_incremental_lookback_period_updated(self, api, monkeypatch, set_today):
327327
start_date = pendulum.parse("2020-03-01")
328328
end_date = pendulum.parse("2020-05-01")
329329
yesterday, _ = set_today("2020-04-01")
330-
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=20))
330+
331331
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
332332
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)
333333

334-
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
334+
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=20)
335335

336336
records = read_full_refresh(stream)
337337
assert len(records) == (yesterday - start_date).days + 1

docs/integrations/sources/facebook-marketing.md

+1
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ For more information, see the [Facebook Insights API documentation.](https://dev
108108

109109
| Version | Date | Pull Request | Subject |
110110
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
111+
| 0.2.50 | 2022-04-27 | [12402](https://github.com/airbytehq/airbyte/pull/12402) | Add lookback window to insights streams |
111112
| 0.2.49 | 2022-05-20 | [13047](https://github.com/airbytehq/airbyte/pull/13047) | Fix duplicating records during insights lookback period |
112113
| 0.2.48 | 2022-05-19 | [13008](https://github.com/airbytehq/airbyte/pull/13008) | Update CDK to v0.1.58 avoid crashing on incorrect stream schemas |
113114
| 0.2.47 | 2022-05-06 | [12685](https://github.com/airbytehq/airbyte/pull/12685) | Update CDK to v0.1.56 to emit an `AirbyeTraceMessage` on uncaught exceptions |

0 commit comments

Comments
 (0)