Skip to content

Commit 7ba1169

Browse files
author
Dmytro Rezchykov
committed
Facebook Marketing performance improvement
1 parent 15fe4dd commit 7ba1169

File tree

5 files changed

+153
-107
lines changed

5 files changed

+153
-107
lines changed

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py

+15
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@ class MyFacebookAdsApi(FacebookAdsApi):
2323
call_rate_threshold = 90 # maximum percentage of call limit utilization
2424
pause_interval_minimum = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit
2525

26+
# from 0 to 1.0
27+
_ads_insights_throttle: float = None
28+
29+
@property
30+
def ads_insights_throttle(self):
31+
return self._ads_insights_throttle
32+
33+
@ads_insights_throttle.setter
34+
def ads_insights_throttle(self, value):
35+
self._ads_insights_throttle = value
36+
2637
@staticmethod
2738
def parse_call_rate_header(headers):
2839
usage = 0
@@ -90,6 +101,10 @@ def call(
90101
):
91102
"""Makes an API call, delegate actual work to parent class and handles call rates"""
92103
response = super().call(method, path, params, headers, files, url_override, api_version)
104+
ads_insights_throttle = response.headers().get("x-fb-ads-insights-throttle")
105+
if ads_insights_throttle:
106+
ads_insights_throttle = json.loads(ads_insights_throttle)
107+
self.ads_insights_throttle = max(ads_insights_throttle.get("app_id_util_pct"), ads_insights_throttle.get("acc_id_util_pct"))
93108
self.handle_call_rate_limit(response, params)
94109
return response
95110

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/async_job.py

+11-23
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,29 @@
88

99
import backoff
1010
import pendulum
11+
from facebook_business.adobjects.adreportrun import AdReportRun
1112
from facebook_business.exceptions import FacebookRequestError
1213
from source_facebook_marketing.api import API
1314

14-
from .common import JobException, JobTimeoutException, retry_pattern
15+
from .common import JobException, retry_pattern
1516

1617
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
1718
logger = logging.getLogger("airbyte")
1819

1920

20-
class Status(Enum):
21+
class Status(str, Enum):
2122
"""Async job statuses"""
2223

2324
COMPLETED = "Job Completed"
2425
FAILED = "Job Failed"
2526
SKIPPED = "Job Skipped"
2627
STARTED = "Job Started"
28+
NOT_STARTED = "Job Not Started"
2729

2830

2931
class AsyncJob:
3032
"""AsyncJob wraps FB AdReport class and provides interface to restart/retry the async job"""
3133

32-
MAX_WAIT_TO_START = pendulum.duration(minutes=5)
33-
MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30)
34-
3534
def __init__(self, api: API, params: Mapping[str, Any]):
3635
"""Initialize
3736
@@ -40,7 +39,7 @@ def __init__(self, api: API, params: Mapping[str, Any]):
4039
"""
4140
self._params = params
4241
self._api = api
43-
self._job = None
42+
self._job: AdReportRun = None
4443
self._start_time = None
4544
self._finish_time = None
4645
self._failed = False
@@ -90,7 +89,7 @@ def completed(self) -> bool:
9089
return self._check_status()
9190
except JobException:
9291
self._failed = True
93-
raise
92+
return True
9493

9594
@property
9695
def failed(self) -> bool:
@@ -113,25 +112,14 @@ def _check_status(self) -> bool:
113112
job_progress_pct = self._job["async_percent_completion"]
114113
logger.info(f"{self} is {job_progress_pct}% complete ({self._job['async_status']})")
115114
runtime = self.elapsed_time
115+
job_status = Status(self._job["async_status"])
116116

117-
if self._job["async_status"] == Status.COMPLETED.value:
117+
if job_status == Status.COMPLETED:
118118
self._finish_time = pendulum.now()
119119
return True
120-
elif self._job["async_status"] == Status.FAILED.value:
121-
raise JobException(f"{self._job} failed after {runtime.in_seconds()} seconds.")
122-
elif self._job["async_status"] == Status.SKIPPED.value:
123-
raise JobException(f"{self._job} skipped after {runtime.in_seconds()} seconds.")
124-
125-
if runtime > self.MAX_WAIT_TO_START and self._job["async_percent_completion"] == 0:
126-
raise JobTimeoutException(
127-
f"{self._job} did not start after {runtime.in_seconds()} seconds."
128-
f" This is an intermittent error which may be fixed by retrying the job. Aborting."
129-
)
130-
elif runtime > self.MAX_WAIT_TO_FINISH:
131-
raise JobTimeoutException(
132-
f"{self._job} did not finish after {runtime.in_seconds()} seconds."
133-
f" This is an intermittent error which may be fixed by retrying the job. Aborting."
134-
)
120+
elif job_status in [Status.FAILED, Status.SKIPPED]:
121+
raise JobException(f"{self._job} has status {job_status} after {runtime.in_seconds()} seconds.")
122+
135123
return False
136124

137125
@backoff_policy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#
2+
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import logging
6+
import time
7+
from collections import deque
8+
from dataclasses import dataclass
9+
10+
import pendulum
11+
from source_facebook_marketing.api import API
12+
13+
from .async_job import AsyncJob
14+
15+
16+
@dataclass
17+
class InsightsAsyncJobManager:
18+
19+
logger = logging.getLogger("airbyte")
20+
api: API
21+
from_date: pendulum.Date
22+
to_date: pendulum.Date
23+
start_days_per_job: int
24+
job_params: dict
25+
_jobs_queue: deque = deque()
26+
27+
THROTTLE_LIMIT = 0.7
28+
FAILED_JOBS_RESTART_COUNT = 5
29+
JOB_STATUS_UPDATE_SLEEP_SECONDS = 30
30+
31+
def done(self) -> bool:
32+
"""docstring for done"""
33+
return len(self._jobs_queue) == 0
34+
35+
def _current_throttle(self):
36+
return self.api.api.ads_insights_throttle
37+
38+
def get_next_range(self):
39+
until = min(
40+
self.from_date + pendulum.Duration(days=self.start_days_per_job),
41+
self.to_date,
42+
)
43+
try:
44+
return {
45+
"time_range": {
46+
"since": self.from_date.to_date_string(),
47+
"until": until.to_date_string(),
48+
},
49+
}
50+
finally:
51+
self.from_date = until.add(days=1)
52+
53+
def no_more_ranges(self) -> bool:
54+
return self.from_date >= self.to_date
55+
56+
def add_async_jobs(self):
57+
if self.no_more_ranges():
58+
return
59+
self._update_api_throttle_limit()
60+
while self._current_throttle() < self.THROTTLE_LIMIT and not self.no_more_ranges():
61+
next_range = self.get_next_range()
62+
params = {**self.job_params, **next_range}
63+
job = AsyncJob(api=self.api, params=params)
64+
job.start()
65+
self._jobs_queue.append(job)
66+
self.logger.info(f"Current throttle limit is {self._current_throttle()}")
67+
68+
def get_next_completed_job(self) -> AsyncJob:
69+
job = self._jobs_queue[0]
70+
for _ in range(self.FAILED_JOBS_RESTART_COUNT):
71+
while not job.completed:
72+
self.logger.info(f"Job {job} is not ready, wait for {self.JOB_STATUS_UPDATE_SLEEP_SECONDS}")
73+
time.sleep(self.JOB_STATUS_UPDATE_SLEEP_SECONDS)
74+
if job.failed:
75+
self.logger(f"Job {job} failed, restarting")
76+
job.restart()
77+
self.add_async_jobs()
78+
return self._jobs_queue.popleft()
79+
else:
80+
# TODO: Break range into smaller parts if job failing constantly
81+
raise Exception(f"{job} failed")
82+
83+
def _update_api_throttle_limit(self):
84+
self.api.account.get_insights()

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py

+34-69
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@
22
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
33
#
44

5-
import time
65
import urllib.parse as urlparse
76
from abc import ABC
8-
from collections import deque
97
from datetime import datetime
108
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence
119

@@ -21,9 +19,9 @@
2119
from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse
2220
from facebook_business.exceptions import FacebookRequestError
2321
from source_facebook_marketing.api import API
22+
from source_facebook_marketing.async_job_manager import InsightsAsyncJobManager
2423

25-
from .async_job import AsyncJob
26-
from .common import FacebookAPIException, JobException, batch, deep_merge, retry_pattern
24+
from .common import FacebookAPIException, batch, deep_merge, retry_pattern
2725

2826
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
2927

@@ -291,8 +289,8 @@ class AdsInsights(FBMarketingIncrementalStream):
291289
]
292290

293291
MAX_ASYNC_SLEEP = pendulum.duration(minutes=5)
294-
MAX_ASYNC_JOBS = 10
295-
INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30)
292+
MAX_ASYNC_JOBS = 1000
293+
INSIGHTS_RETENTION_PERIOD_MONTHES = 37
296294

297295
action_breakdowns = ALL_ACTION_BREAKDOWNS
298296
level = "ad"
@@ -314,7 +312,7 @@ def __init__(
314312

315313
super().__init__(**kwargs)
316314
self.lookback_window = pendulum.duration(days=buffer_days)
317-
self._days_per_job = days_per_job
315+
self._days_per_job = 5 # days_per_job
318316
self._fields = fields
319317
self.action_breakdowns = action_breakdowns or self.action_breakdowns
320318
self.breakdowns = breakdowns or self.breakdowns
@@ -336,7 +334,7 @@ def read_records(
336334
stream_state: Mapping[str, Any] = None,
337335
) -> Iterable[Mapping[str, Any]]:
338336
"""Waits for current job to finish (slice) and yield its result"""
339-
job = self.wait_for_job(stream_slice["job"])
337+
job = stream_slice["job"]
340338
# because we query `lookback_window` days before actual cursor we might get records older then cursor
341339

342340
for obj in job.get_result():
@@ -349,50 +347,37 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
349347
2. we should run as many job as possible before checking for result
350348
3. we shouldn't proceed to consumption of the next job before previous succeed
351349
"""
352-
stream_state = stream_state or {}
353-
running_jobs = deque()
354-
date_ranges = list(self._date_ranges(stream_state=stream_state))
355-
for params in date_ranges:
356-
params = deep_merge(params, self.request_params(stream_state=stream_state))
357-
job = AsyncJob(api=self._api, params=params)
358-
job.start()
359-
running_jobs.append(job)
360-
if len(running_jobs) >= self.MAX_ASYNC_JOBS:
361-
yield {"job": running_jobs.popleft()}
362-
363-
while running_jobs:
364-
yield {"job": running_jobs.popleft()}
365-
366-
@retry_pattern(backoff.expo, JobException, max_tries=10, factor=5)
367-
def wait_for_job(self, job: AsyncJob) -> AsyncJob:
368-
if job.failed:
369-
job.restart()
370-
371-
factor = 2
372-
sleep_seconds = factor
373-
while not job.completed:
374-
self.logger.info(f"{job}: sleeping {sleep_seconds} seconds while waiting for completion")
375-
time.sleep(sleep_seconds)
376-
if sleep_seconds < self.MAX_ASYNC_SLEEP.in_seconds():
377-
sleep_seconds *= factor
378-
379-
return job
380350

381-
def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
382-
params = super().request_params(stream_state=stream_state, **kwargs)
383-
params = deep_merge(
384-
params,
385-
{
386-
"level": self.level,
387-
"action_breakdowns": self.action_breakdowns,
388-
"breakdowns": self.breakdowns,
389-
"fields": self.fields,
390-
"time_increment": self.time_increment,
391-
"action_attribution_windows": self.action_attribution_windows,
392-
},
351+
job_params = self.request_params(stream_state=stream_state)
352+
job_manager = InsightsAsyncJobManager(
353+
api=self._api,
354+
job_params=job_params,
355+
start_days_per_job=5,
356+
from_date=self.get_start_date(stream_state),
357+
to_date=self._end_date,
393358
)
359+
job_manager.add_async_jobs()
394360

395-
return params
361+
while not job_manager.done():
362+
yield {"job": job_manager.get_next_completed_job()}
363+
364+
def get_start_date(self, stream_state: Mapping[str, Any]) -> pendulum.Date:
365+
state_value = stream_state.get(self.cursor_field) if stream_state else None
366+
if state_value:
367+
start_date = pendulum.parse(state_value) - self.lookback_window
368+
else:
369+
start_date = self._start_date
370+
return max(self._end_date.subtract(months=self.INSIGHTS_RETENTION_PERIOD_MONTHES), start_date)
371+
372+
def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
373+
return {
374+
"level": self.level,
375+
"action_breakdowns": self.action_breakdowns,
376+
"breakdowns": self.breakdowns,
377+
"fields": self.fields,
378+
"time_increment": self.time_increment,
379+
"action_attribution_windows": self.action_attribution_windows,
380+
}
396381

397382
def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
398383
"""Works differently for insights, so remove it"""
@@ -435,26 +420,6 @@ def _schema_for_breakdowns(self) -> Mapping[str, Any]:
435420

436421
return {breakdown: schemas[breakdown] for breakdown in self.breakdowns}
437422

438-
def _date_ranges(self, stream_state: Mapping[str, Any]) -> Iterator[dict]:
439-
"""Iterate over period between start_date/state and now
440-
441-
Notes: Facebook freezes insight data 28 days after it was generated, which means that all data
442-
from the past 28 days may have changed since we last emitted it, so we retrieve it again.
443-
"""
444-
state_value = stream_state.get(self.cursor_field)
445-
if state_value:
446-
start_date = pendulum.parse(state_value) - self.lookback_window
447-
else:
448-
start_date = self._start_date
449-
end_date = self._end_date
450-
start_date = max(end_date - self.INSIGHTS_RETENTION_PERIOD, start_date)
451-
452-
for since in pendulum.period(start_date, end_date).range("days", self._days_per_job):
453-
until = min(since.add(days=self._days_per_job - 1), end_date) # -1 because time_range is inclusive
454-
yield {
455-
"time_range": {"since": since.to_date_string(), "until": until.to_date_string()},
456-
}
457-
458423

459424
class AdsInsightsAgeAndGender(AdsInsights):
460425
breakdowns = ["age", "gender"]

0 commit comments

Comments
 (0)