Skip to content

Commit 8e11a09

Browse files
davinchiagl-pix
authored andcommitted
Fix scheduler race condition. (#4691)
1 parent 2875270 commit 8e11a09

File tree

2 files changed

+113
-7
lines changed

2 files changed

+113
-7
lines changed

airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
package io.airbyte.scheduler.app;
2626

2727
import com.google.common.annotations.VisibleForTesting;
28+
import com.google.common.collect.Sets;
2829
import io.airbyte.commons.concurrency.LifecycledCallable;
2930
import io.airbyte.commons.enums.Enums;
3031
import io.airbyte.config.helpers.LogClientSingleton;
@@ -36,7 +37,9 @@
3637
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
3738
import java.nio.file.Path;
3839
import java.util.Optional;
40+
import java.util.Set;
3941
import java.util.concurrent.ExecutorService;
42+
import java.util.function.Consumer;
4043
import org.slf4j.Logger;
4144
import org.slf4j.LoggerFactory;
4245
import org.slf4j.MDC;
@@ -50,6 +53,9 @@ public class JobSubmitter implements Runnable {
5053
private final TemporalWorkerRunFactory temporalWorkerRunFactory;
5154
private final JobTracker jobTracker;
5255

56+
// See attemptJobSubmit() to understand the need for this Concurrent Set.
57+
private final Set<Long> runningJobs = Sets.newConcurrentHashSet();
58+
5359
public JobSubmitter(final ExecutorService threadPool,
5460
final JobPersistence persistence,
5561
final TemporalWorkerRunFactory temporalWorkerRunFactory,
@@ -67,18 +73,42 @@ public void run() {
6773

6874
final Optional<Job> nextJob = persistence.getNextJob();
6975

70-
nextJob.ifPresent(job -> {
71-
trackSubmission(job);
72-
submitJob(job);
73-
LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope());
74-
});
76+
nextJob.ifPresent(attemptJobSubmit());
7577

7678
LOGGER.debug("Completed Job-Submitter...");
7779
} catch (Throwable e) {
7880
LOGGER.error("Job Submitter Error", e);
7981
}
8082
}
8183

84+
/**
85+
* Since job submission and job execution happen in two separate thread pools, and job execution is
86+
* what removes a job from the submission queue, it is possible for a job to be submitted multiple
87+
* times.
88+
*
89+
* This synchronised block guarantees only a single thread can utilise the concurrent set to decide
90+
* whether a job should be submitted. This job id is added here, and removed in the finish block of
91+
* {@link #submitJob(Job)}.
92+
*
93+
* Since {@link JobPersistence#getNextJob()} returns the next queued job, this solution cause
94+
* head-of-line blocking as the JobSubmitter tries to submit the same job. However, this suggests
95+
* the Worker Pool needs more workers and is inevitable when dealing with pending jobs.
96+
*
97+
* See https://github.com/airbytehq/airbyte/issues/4378 for more info.
98+
*/
99+
synchronized private Consumer<Job> attemptJobSubmit() {
100+
return job -> {
101+
if (!runningJobs.contains(job.getId())) {
102+
runningJobs.add(job.getId());
103+
trackSubmission(job);
104+
submitJob(job);
105+
LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope());
106+
} else {
107+
LOGGER.info("Attempting to submit already running job {}. There are probably too many queued jobs.", job.getId());
108+
}
109+
};
110+
}
111+
82112
@VisibleForTesting
83113
void submitJob(Job job) {
84114
final WorkerRun workerRun = temporalWorkerRunFactory.create(job);
@@ -94,7 +124,6 @@ void submitJob(Job job) {
94124
final Path logFilePath = workerRun.getJobRoot().resolve(LogClientSingleton.LOG_FILENAME);
95125
final long persistedAttemptId = persistence.createAttempt(job.getId(), logFilePath);
96126
assertSameIds(attemptNumber, persistedAttemptId);
97-
98127
LogClientSingleton.setJobMdc(workerRun.getJobRoot());
99128
})
100129
.setOnSuccess(output -> {
@@ -114,7 +143,10 @@ void submitJob(Job job) {
114143
persistence.failAttempt(job.getId(), attemptNumber);
115144
trackCompletion(job, io.airbyte.workers.JobStatus.FAILED);
116145
})
117-
.setOnFinish(MDC::clear)
146+
.setOnFinish(() -> {
147+
runningJobs.remove(job.getId());
148+
MDC.clear();
149+
})
118150
.build());
119151
}
120152

airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobSubmitterTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,14 @@
5858
import java.nio.file.Path;
5959
import java.util.Map;
6060
import java.util.Optional;
61+
import java.util.concurrent.Executors;
62+
import java.util.concurrent.atomic.AtomicInteger;
6163
import java.util.concurrent.atomic.AtomicReference;
6264
import org.junit.jupiter.api.BeforeEach;
65+
import org.junit.jupiter.api.Nested;
6366
import org.junit.jupiter.api.Test;
6467
import org.mockito.InOrder;
68+
import org.mockito.Mockito;
6569
import org.slf4j.MDC;
6670

6771
public class JobSubmitterTest {
@@ -226,4 +230,74 @@ void testMDC() throws Exception {
226230
assertTrue(MDC.getCopyOfContextMap().isEmpty());
227231
}
228232

233+
@Nested
234+
class OnlyOneJobIdRunning {
235+
236+
/**
237+
* See {@link JobSubmitter#attemptJobSubmit()} to understand why we need to test that only one job
238+
* id can be successfully submited at once.
239+
*/
240+
@Test
241+
public void testOnlyOneJobCanBeSubmittedAtOnce() throws Exception {
242+
var jobDone = new AtomicReference<>(false);
243+
when(workerRun.call()).thenAnswer((a) -> {
244+
Thread.sleep(5000);
245+
jobDone.set(true);
246+
return SUCCESS_OUTPUT;
247+
});
248+
249+
// Simulate the same job being submitted over and over again.
250+
var simulatedJobSubmitterPool = Executors.newFixedThreadPool(10);
251+
var submitCounter = new AtomicInteger(0);
252+
while (!jobDone.get()) {
253+
// This sleep mimics our SchedulerApp loop.
254+
Thread.sleep(1000);
255+
simulatedJobSubmitterPool.submit(() -> {
256+
if (!jobDone.get()) {
257+
jobSubmitter.run();
258+
submitCounter.incrementAndGet();
259+
}
260+
});
261+
}
262+
263+
simulatedJobSubmitterPool.shutdownNow();
264+
verify(persistence, Mockito.times(submitCounter.get())).getNextJob();
265+
// Assert that the job is actually only submitted once.
266+
verify(jobSubmitter, Mockito.times(1)).submitJob(Mockito.any());
267+
}
268+
269+
@Test
270+
public void testSuccessShouldUnlockId() throws Exception {
271+
when(workerRun.call()).thenReturn(SUCCESS_OUTPUT);
272+
273+
jobSubmitter.run();
274+
275+
// This sleep mimics our SchedulerApp loop.
276+
Thread.sleep(1000);
277+
278+
// If the id was not removed, the second call would not trigger submitJob().
279+
jobSubmitter.run();
280+
281+
verify(persistence, Mockito.times(2)).getNextJob();
282+
verify(jobSubmitter, Mockito.times(2)).submitJob(Mockito.any());
283+
}
284+
285+
@Test
286+
public void testFailureShouldUnlockId() throws Exception {
287+
when(workerRun.call()).thenThrow(new RuntimeException());
288+
289+
jobSubmitter.run();
290+
291+
// This sleep mimics our SchedulerApp loop.
292+
Thread.sleep(1000);
293+
294+
// If the id was not removed, the second call would not trigger submitJob().
295+
jobSubmitter.run();
296+
297+
verify(persistence, Mockito.times(2)).getNextJob();
298+
verify(jobSubmitter, Mockito.times(2)).submitJob(Mockito.any());
299+
}
300+
301+
}
302+
229303
}

0 commit comments

Comments
 (0)