Skip to content

Commit 3d30180

Browse files
Phlairoctavia-squidington-iii
authored andcommitted
Source Facebook Marketing: Attempt to retry failing jobs that are already split to minimum size (#12390)
* restart jobs that are already split to smallest size * manager now fails on nested jobs hitting max attempts * version bump * auto-bump connector version Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 1906e77 commit 3d30180

File tree

8 files changed

+56
-6
lines changed

8 files changed

+56
-6
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@
210210
- name: Facebook Marketing
211211
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
212212
dockerRepository: airbyte/source-facebook-marketing
213-
dockerImageTag: 0.2.44
213+
dockerImageTag: 0.2.45
214214
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
215215
icon: facebook.svg
216216
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1748,7 +1748,7 @@
17481748
supportsNormalization: false
17491749
supportsDBT: false
17501750
supported_destination_sync_modes: []
1751-
- dockerImage: "airbyte/source-facebook-marketing:0.2.44"
1751+
- dockerImage: "airbyte/source-facebook-marketing:0.2.45"
17521752
spec:
17531753
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
17541754
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"

airbyte-integrations/connectors/source-facebook-marketing/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

1515

16-
LABEL io.airbyte.version=0.2.44
16+
LABEL io.airbyte.version=0.2.45
1717
LABEL io.airbyte.name=airbyte/source-facebook-marketing

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,13 @@ def split_job(self) -> List["AsyncJob"]:
158158
new_jobs = []
159159
for job in self._jobs:
160160
if job.failed:
161-
new_jobs.extend(job.split_job())
161+
try:
162+
new_jobs.extend(job.split_job())
163+
except ValueError as split_limit_error:
164+
logger.error(split_limit_error)
165+
logger.info(f'can\'t split "{job}" any smaller, attempting to retry the job.')
166+
job.restart()
167+
new_jobs.append(job)
162168
else:
163169
new_jobs.append(job)
164170
return new_jobs
@@ -202,7 +208,7 @@ def split_job(self) -> List["AsyncJob"]:
202208
return self._split_by_edge_class(AdSet)
203209
elif isinstance(self._edge_object, AdSet):
204210
return self._split_by_edge_class(Ad)
205-
raise RuntimeError("The job is already splitted to the smallest size.")
211+
raise ValueError("The job is already splitted to the smallest size.")
206212

207213
def _split_by_edge_class(self, edge_class: Union[Type[Campaign], Type[AdSet], Type[Ad]]) -> List[AsyncJob]:
208214
"""Split insight job by creating insight jobs from lower edge object, i.e.

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py

+6
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ def _check_jobs_status_and_restart(self) -> List[AsyncJob]:
9494
self._wait_throttle_limit_down()
9595
for job in self._running_jobs:
9696
if job.failed:
97+
if isinstance(job, ParentAsyncJob):
98+
# if this job is a ParentAsyncJob, it holds X number of jobs
99+
# we want to check that none of these nested jobs have exceeded MAX_NUMBER_OF_ATTEMPTS
100+
for nested_job in job._jobs:
101+
if nested_job.attempt_number >= self.MAX_NUMBER_OF_ATTEMPTS:
102+
raise JobException(f"{nested_job}: failed more than {self.MAX_NUMBER_OF_ATTEMPTS} times. Terminating...")
97103
if job.attempt_number >= self.MAX_NUMBER_OF_ATTEMPTS:
98104
raise JobException(f"{job}: failed more than {self.MAX_NUMBER_OF_ATTEMPTS} times. Terminating...")
99105
elif job.attempt_number == 2:

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ def test_split_job_smallest(self, mocker, api):
336336
params = {"time_increment": 1, "breakdowns": []}
337337
job = InsightAsyncJob(api=api, edge_object=Ad(1), interval=interval, params=params)
338338

339-
with pytest.raises(RuntimeError, match="The job is already splitted to the smallest size."):
339+
with pytest.raises(ValueError, match="The job is already splitted to the smallest size."):
340340
job.split_job()
341341

342342

@@ -415,5 +415,19 @@ def test_split_job(self, parent_job, grouped_jobs, mocker):
415415
else:
416416
job.split_job.assert_not_called()
417417

418+
def test_split_job_smallest(self, parent_job, grouped_jobs):
419+
grouped_jobs[0].failed = True
420+
grouped_jobs[0].split_job.side_effect = ValueError("Mocking smallest size")
421+
422+
# arbitrarily testing this X times, the max attempts is handled by async_job_manager rather than the job itself.
423+
count = 0
424+
while count < 10:
425+
split_jobs = parent_job.split_job()
426+
assert len(split_jobs) == len(
427+
grouped_jobs
428+
), "attempted to split job at smallest size so should just restart job meaning same no. of jobs"
429+
grouped_jobs[0].attempt_number += 1
430+
count += 1
431+
418432
def test_str(self, parent_job, grouped_jobs):
419433
assert str(parent_job) == f"ParentAsyncJob({grouped_jobs[0]} ... {len(grouped_jobs) - 1} jobs more)"

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py

+23
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,26 @@ def update_job_behaviour():
151151

152152
with pytest.raises(JobException, match=f"{jobs[1]}: failed more than {InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS} times."):
153153
next(manager.completed_jobs(), None)
154+
155+
def test_nested_job_failed_too_many_times(self, api, mocker, time_mock, update_job_mock):
156+
"""Manager should fail when a nested job within a ParentAsyncJob failed too many times"""
157+
158+
def update_job_behaviour():
159+
jobs[1].failed = True
160+
sub_jobs[1].failed = True
161+
sub_jobs[1].attempt_number = InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS
162+
yield from range(10)
163+
164+
update_job_mock.side_effect = update_job_behaviour()
165+
sub_jobs = [
166+
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
167+
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False),
168+
]
169+
jobs = [
170+
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
171+
mocker.Mock(spec=ParentAsyncJob, _jobs=sub_jobs, attempt_number=1, failed=False, completed=False),
172+
]
173+
manager = InsightAsyncJobManager(api=api, jobs=jobs)
174+
175+
with pytest.raises(JobException):
176+
next(manager.completed_jobs(), None)

docs/integrations/sources/facebook-marketing.md

+1
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ For more information, see the [Facebook Insights API documentation.](https://dev
108108

109109
| Version | Date | Pull Request | Subject |
110110
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
111+
| 0.2.45 | 2022-05-03 | [12390](https://github.com/airbytehq/airbyte/pull/12390) | Better retry logic for split-up async jobs |
111112
| 0.2.44 | 2022-04-14 | [11751](https://github.com/airbytehq/airbyte/pull/11751) | Update API to a directly initialise an AdAccount with the given ID |
112113
| 0.2.43 | 2022-04-13 | [11801](https://github.com/airbytehq/airbyte/pull/11801) | Fix `user_tos_accepted` schema to be an object
113114
| 0.2.42 | 2022-04-06 | [11761](https://github.com/airbytehq/airbyte/pull/11761) | Upgrade Facebook Python SDK to version 13|

0 commit comments

Comments
 (0)