Skip to content

Commit 44782dc

Browse files
aoeuicopybara-github
authored andcommitted
An array-based task queue to address performance regressions.
TaskFifo has higher performance than `ConcurrentLinkedQueue` because primary contention is resolved through atomic increment, which translates into `LOCK XADD` and doesn't require a CAS loop. TaskFifo also generates negligible per-task garbage, only doing so when there is a descheduled or unusually slow take thread. PiperOrigin-RevId: 507600172 Change-Id: Iab60eec942a1b12e0ebe0fb3947c3b20a3bf8997
1 parent e9316f0 commit 44782dc

File tree

4 files changed

+695
-52
lines changed

4 files changed

+695
-52
lines changed

src/main/java/com/google/devtools/build/lib/concurrent/PriorityWorkerPool.java

Lines changed: 19 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import static com.google.common.base.MoreObjects.toStringHelper;
1717
import static com.google.common.base.Preconditions.checkArgument;
1818
import static com.google.common.base.Preconditions.checkState;
19+
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.createPaddedBaseAddress;
20+
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.getAlignedAddress;
1921
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.DO_CPU_HEAVY_TASK;
2022
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.DO_TASK;
2123
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.IDLE;
@@ -32,7 +34,6 @@
3234
import java.lang.ref.PhantomReference;
3335
import java.lang.ref.ReferenceQueue;
3436
import java.util.TreeMap;
35-
import java.util.concurrent.ConcurrentLinkedQueue;
3637
import java.util.concurrent.ConcurrentSkipListSet;
3738
import java.util.concurrent.ForkJoinPool;
3839
import java.util.concurrent.ForkJoinWorkerThread;
@@ -68,12 +69,11 @@ final class PriorityWorkerPool {
6869
*
6970
* <p>An interesting alternative to consider is to place unprioritized tasks directly into {@link
7071
* #pool}, which could reduce the work performed by the system. Doing this results in about a
71-
* {@code 4%} end-to-end analysis regression in our benchmark. The reasons are not clear, but
72-
* perhaps polling from a {@link ConcurrentLinkedQueue}, as implemented in {@link
73-
* WorkerThread#runLoop} is more efficient than random scanning of {@link ForkJoinPool}, or it
74-
* could be a domain specific reason having to do with differences in the resulting task ordering.
72+
* {@code 4%} end-to-end regression in our benchmark. The likely cause for this is that FIFO
73+
* behavior is very important for performance because it reflects the ordering of prioritized
74+
* tasks.
7575
*/
76-
private final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
76+
private final TaskFifo queue;
7777

7878
private final ConcurrentSkipListSet<ComparableRunnable> cpuHeavyQueue =
7979
new ConcurrentSkipListSet<>();
@@ -131,10 +131,14 @@ final class PriorityWorkerPool {
131131
this.pool = newForkJoinPool();
132132
this.errorClassifier = errorClassifier;
133133

134-
long baseAddress = PaddedAddresses.createPaddedBaseAddress(2);
134+
long baseAddress = createPaddedBaseAddress(4);
135135
cleaner.register(this, new AddressFreer(baseAddress));
136-
this.countersAddress = PaddedAddresses.getAlignedAddress(baseAddress, /* offset= */ 0);
137-
this.queueSizeAddress = PaddedAddresses.getAlignedAddress(baseAddress, /* offset= */ 1);
136+
this.countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0);
137+
this.queue =
138+
new TaskFifo(
139+
/* sizeAddress= */ getAlignedAddress(baseAddress, /* offset= */ 1),
140+
/* appendIndexAddress= */ getAlignedAddress(baseAddress, /* offset= */ 2),
141+
/* takeIndexAddress= */ getAlignedAddress(baseAddress, /* offset= */ 3));
138142

139143
resetExecutionCounters();
140144
}
@@ -159,7 +163,7 @@ void execute(Runnable rawTask) {
159163
}
160164
}
161165

162-
while (!tryAppend(rawTask)) {
166+
while (!queue.tryAppend(rawTask)) {
163167
// If the queue is full, this thread donates some work to reduce the queue.
164168
if (!tryAcquireTask()) {
165169
// This should be very hard to reach if the queue is full except under cancellation. It's
@@ -180,16 +184,6 @@ void execute(Runnable rawTask) {
180184
}
181185
}
182186

183-
private boolean tryAppend(Runnable task) {
184-
if (queueSizeIncrementAndGet() > TASKS_MAX_VALUE) {
185-
queueSizeDecrement();
186-
return false;
187-
}
188-
189-
queue.add(task);
190-
return true;
191-
}
192-
193187
/**
194188
* An object to {@link Object#wait} or {@link Object#notifyAll} on for quiescence.
195189
*
@@ -275,7 +269,7 @@ public String toString() {
275269
}
276270
return toStringHelper(this)
277271
.add("available", formatSnapshot(getExecutionCounters()))
278-
.add("|queue|", queueSize())
272+
.add("|queue|", queue.size())
279273
.add("|cpu queue|", cpuHeavyQueue.size())
280274
.add("threads", threadStates)
281275
.add("unhandled", unhandled.get())
@@ -422,8 +416,7 @@ boolean tryDoQueuedWork() {
422416

423417
private void dequeueTaskAndRun() {
424418
try {
425-
var task = queue.poll();
426-
UNSAFE.getAndAddInt(null, queueSizeAddress, -1);
419+
var task = queue.take();
427420
task.run();
428421
} catch (Throwable uncaught) {
429422
handleUncaughtError(uncaught);
@@ -453,7 +446,7 @@ private void dequeueCpuHeavyTaskAndRun() {
453446
private static final long TASKS_MASK = 0x0000_001F_FF80_0000L;
454447
private static final int TASKS_BIT_OFFSET = 23;
455448
private static final long ONE_TASK = 1L << TASKS_BIT_OFFSET;
456-
@VisibleForTesting static final int TASKS_MAX_VALUE = (int) (TASKS_MASK >> TASKS_BIT_OFFSET);
449+
static final int TASKS_MAX_VALUE = (int) (TASKS_MASK >> TASKS_BIT_OFFSET);
457450

458451
private static final long CPU_HEAVY_TASKS_MASK = 0x0000_0000_07F_FFFFL;
459452
private static final int CPU_HEAVY_TASKS_BIT_OFFSET = 0;
@@ -477,8 +470,8 @@ private void dequeueCpuHeavyTaskAndRun() {
477470
* <ol>
478471
* <li>Canceled - (1 bit) true for cancelled.
479472
* <li>CPU Permits - (11 bits) how many CPU heavy permits are available.
480-
* <li>Threads - (16 bits) how many threads are available.
481-
* <li>Tasks - (13 bits) how many non-CPU heavy tasks are inflight.
473+
* <li>Threads - (15 bits) how many threads are available.
474+
* <li>Tasks - (14 bits) how many non-CPU heavy tasks are inflight.
482475
* <li>CPU Heavy Tasks - (23 bits) how many CPU heavy tasks are inflight.
483476
* </ol>
484477
*
@@ -512,7 +505,6 @@ private void resetExecutionCounters() {
512505
countersAddress,
513506
(((long) poolSize) << THREADS_BIT_OFFSET)
514507
| (((long) cpuPermits) << CPU_PERMITS_BIT_OFFSET));
515-
UNSAFE.putInt(null, queueSizeAddress, 0);
516508
}
517509

518510
private boolean acquireThreadElseReleaseTask() {
@@ -625,31 +617,6 @@ private boolean tryUpdateExecutionCounters(long snapshot, long target) {
625617
return UNSAFE.compareAndSwapLong(null, countersAddress, snapshot, target);
626618
}
627619

628-
/**
629-
* Address of the {@code int} queue size.
630-
*
631-
* <p>The queue size is used to detect when adding to the queue might overflow. Having a fixed cap
632-
* on queue size enables using fewer bits to track its state, simplifying code paths having high
633-
* contention.
634-
*
635-
* <p>The queue size can be transiently greater than, but is never less than the actual queue
636-
* size. This property is maintained by always incrementing the counter before inserting into the
637-
* queue and always decrementing after inserting into the queue.
638-
*/
639-
private final long queueSizeAddress;
640-
641-
private int queueSize() {
642-
return UNSAFE.getIntVolatile(null, queueSizeAddress);
643-
}
644-
645-
private int queueSizeIncrementAndGet() {
646-
return UNSAFE.getAndAddInt(null, queueSizeAddress, 1) + 1;
647-
}
648-
649-
private void queueSizeDecrement() {
650-
UNSAFE.getAndAddInt(null, queueSizeAddress, -1);
651-
}
652-
653620
private static String formatSnapshot(long snapshot) {
654621
return String.format(
655622
"{cancelled=%b, threads=%d, cpuPermits=%d, tasks=%d, cpuHeavyTasks=%d}",

0 commit comments

Comments
 (0)