Skip to content

Commit 3cc9060

Browse files
committed
manager now fails on nested jobs hitting max attempts
1 parent 0fb5d03 commit 3cc9060

File tree

4 files changed

+36
-15
lines changed

4 files changed

+36
-15
lines changed

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py

+3-8
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@ class Status(str, Enum):
5858
class AsyncJob(ABC):
5959
"""Abstract AsyncJob base class"""
6060

61-
# max attempts for a job before errroring out
62-
max_attempts: int = 10 # TODO: verify a sane number for this
63-
6461
def __init__(self, api: FacebookAdsApi, interval: pendulum.Period):
6562
"""Init generic async job
6663
@@ -163,11 +160,9 @@ def split_job(self) -> List["AsyncJob"]:
163160
if job.failed:
164161
try:
165162
new_jobs.extend(job.split_job())
166-
except RuntimeError as split_limit_error:
163+
except ValueError as split_limit_error:
167164
logger.error(split_limit_error)
168-
if job.attempt_number > job.max_attempts:
169-
raise RuntimeError(f"{job} at smallest split size and still failing after {job.max_attempts} retries.")
170-
logger.info(f'can\'t split "{job}" any smaller, attempting to restart instead.')
165+
logger.info(f'can\'t split "{job}" any smaller, attempting to retry the job.')
171166
job.restart()
172167
new_jobs.append(job)
173168
else:
@@ -213,7 +208,7 @@ def split_job(self) -> List["AsyncJob"]:
213208
return self._split_by_edge_class(AdSet)
214209
elif isinstance(self._edge_object, AdSet):
215210
return self._split_by_edge_class(Ad)
216-
raise RuntimeError("The job is already splitted to the smallest size.")
211+
raise ValueError("The job is already splitted to the smallest size.")
217212

218213
def _split_by_edge_class(self, edge_class: Union[Type[Campaign], Type[AdSet], Type[Ad]]) -> List[AsyncJob]:
219214
"""Split insight job by creating insight jobs from lower edge object, i.e.

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py

+6
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ def _check_jobs_status_and_restart(self) -> List[AsyncJob]:
9494
self._wait_throttle_limit_down()
9595
for job in self._running_jobs:
9696
if job.failed:
97+
if isinstance(job, ParentAsyncJob):
98+
# if this job is a ParentAsyncJob, it holds X number of jobs
99+
# we want to check that none of these nested jobs have exceeded MAX_NUMBER_OF_ATTEMPTS
100+
for nested_job in job._jobs:
101+
if nested_job.attempt_number >= self.MAX_NUMBER_OF_ATTEMPTS:
102+
raise JobException(f"{nested_job}: failed more than {self.MAX_NUMBER_OF_ATTEMPTS} times. Terminating...")
97103
if job.attempt_number >= self.MAX_NUMBER_OF_ATTEMPTS:
98104
raise JobException(f"{job}: failed more than {self.MAX_NUMBER_OF_ATTEMPTS} times. Terminating...")
99105
elif job.attempt_number == 2:

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py

+4-7
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ def test_split_job_smallest(self, mocker, api):
336336
params = {"time_increment": 1, "breakdowns": []}
337337
job = InsightAsyncJob(api=api, edge_object=Ad(1), interval=interval, params=params)
338338

339-
with pytest.raises(RuntimeError, match="The job is already splitted to the smallest size."):
339+
with pytest.raises(ValueError, match="The job is already splitted to the smallest size."):
340340
job.split_job()
341341

342342

@@ -416,21 +416,18 @@ def test_split_job(self, parent_job, grouped_jobs, mocker):
416416
job.split_job.assert_not_called()
417417

418418
def test_split_job_smallest(self, parent_job, grouped_jobs):
419-
grouped_jobs[0].max_attempts = InsightAsyncJob.max_attempts
420419
grouped_jobs[0].failed = True
421-
grouped_jobs[0].split_job.side_effect = RuntimeError("Mocking smallest size")
420+
grouped_jobs[0].split_job.side_effect = ValueError("Mocking smallest size")
422421

422+
# arbitrarily testing this X times, the max attempts is handled by async_job_manager rather than the job itself.
423423
count = 0
424-
while count < InsightAsyncJob.max_attempts:
424+
while count < 10:
425425
split_jobs = parent_job.split_job()
426426
assert len(split_jobs) == len(
427427
grouped_jobs
428428
), "attempted to split job at smallest size so should just restart job meaning same no. of jobs"
429429
grouped_jobs[0].attempt_number += 1
430430
count += 1
431431

432-
with pytest.raises(RuntimeError): # now that we've hit max_attempts, we should error out
433-
parent_job.split_job()
434-
435432
def test_str(self, parent_job, grouped_jobs):
436433
assert str(parent_job) == f"ParentAsyncJob({grouped_jobs[0]} ... {len(grouped_jobs) - 1} jobs more)"

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py

+23
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,26 @@ def update_job_behaviour():
151151

152152
with pytest.raises(JobException, match=f"{jobs[1]}: failed more than {InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS} times."):
153153
next(manager.completed_jobs(), None)
154+
155+
def test_nested_job_failed_too_many_times(self, api, mocker, time_mock, update_job_mock):
156+
"""Manager should fail when a nested job within a ParentAsyncJob failed too many times"""
157+
158+
def update_job_behaviour():
159+
jobs[1].failed = True
160+
sub_jobs[1].failed = True
161+
sub_jobs[1].attempt_number = InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS
162+
yield from range(10)
163+
164+
update_job_mock.side_effect = update_job_behaviour()
165+
sub_jobs = [
166+
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
167+
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False),
168+
]
169+
jobs = [
170+
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
171+
mocker.Mock(spec=ParentAsyncJob, _jobs=sub_jobs, attempt_number=1, failed=False, completed=False),
172+
]
173+
manager = InsightAsyncJobManager(api=api, jobs=jobs)
174+
175+
with pytest.raises(JobException):
176+
next(manager.completed_jobs(), None)

0 commit comments

Comments
 (0)