Skip to content

Commit 6080c1e

Browse files
larsrc-googlekatre
authored andcommitted
Let workers finish lost races without delaying dynamic execution.
If a worker is blocked on reading a response, it doesn't listen for interrupts. This changes blocking on reading to blocking on a sleep-loop, and if interrupted, the worker gets to finish in a separate thread before returning to the pool. This lets the action finish immediately. RELNOTES: None. PiperOrigin-RevId: 368207735
1 parent 671e048 commit 6080c1e

File tree

5 files changed

+73
-23
lines changed

5 files changed

+73
-23
lines changed

src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -120,24 +120,12 @@ void putRequest(WorkRequest request) throws IOException {
120120
@Override
121121
WorkResponse getResponse(int requestId) throws IOException, InterruptedException {
122122
recordingInputStream.startRecording(4096);
123-
// Ironically, we don't allow interrupts during dynamic execution, since we can't cancel
124-
// the worker short of destroying it.
125-
if (!workerKey.isSpeculative()) {
126-
while (recordingInputStream.available() == 0) {
127-
try {
128-
Thread.sleep(10);
129-
} catch (InterruptedException e) {
130-
// This should only happen when not in dynamic execution, so we can safely kill the
131-
// worker.
132-
destroy();
133-
throw e;
134-
}
135-
if (!process.isAlive()) {
136-
throw new IOException(
137-
String.format(
138-
"Worker process for %s died while waiting for response",
139-
workerKey.getMnemonic()));
140-
}
123+
while (recordingInputStream.available() == 0) {
124+
Thread.sleep(10);
125+
if (!process.isAlive()) {
126+
throw new IOException(
127+
String.format(
128+
"Worker process for %s died while waiting for response", workerKey.getMnemonic()));
141129
}
142130
}
143131
return workerProtocol.getResponse();

src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,10 @@ WorkResponse execInWorker(
454454

455455
try {
456456
response = worker.getResponse(request.getRequestId());
457+
} catch (InterruptedException e) {
458+
finishWorkAsync(key, worker, request);
459+
worker = null;
460+
throw e;
457461
} catch (IOException e) {
458462
restoreInterrupt(e);
459463
// If protobuf or json reader couldn't parse the response, try to print whatever the
@@ -511,6 +515,41 @@ WorkResponse execInWorker(
511515
return response;
512516
}
513517

518+
/**
519+
* Starts a thread to collect the response from a worker when it's no longer of interest.
520+
*
521+
* <p>This can happen either when we lost the race in dynamic execution or the build got
522+
* interrupted. This takes ownership of the worker for purposes of returning it to the worker
523+
* pool.
524+
*/
525+
private void finishWorkAsync(WorkerKey key, Worker worker, WorkRequest request) {
526+
Thread reaper =
527+
new Thread(
528+
() -> {
529+
Worker w = worker;
530+
try {
531+
w.getResponse(request.getRequestId());
532+
} catch (IOException | InterruptedException e1) {
533+
// If this happens, we either can't trust the output of the worker, or we got
534+
// interrupted while handling being interrupted. In the latter case, let's stop
535+
// trying and just destroy the worker. If it's a singleplex worker, there will
536+
// be a dangling response that we don't want to keep trying to read, so we destroy
537+
// the worker.
538+
try {
539+
workers.invalidateObject(key, w);
540+
w = null;
541+
} catch (IOException | InterruptedException e2) {
542+
// The reaper thread can't do anything useful about this.
543+
}
544+
} finally {
545+
if (w != null) {
546+
workers.returnObject(key, w);
547+
}
548+
}
549+
});
550+
reaper.start();
551+
}
552+
514553
private static void restoreInterrupt(IOException e) {
515554
if (e instanceof InterruptedIOException) {
516555
Thread.currentThread().interrupt();

src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,11 @@
3535
import java.util.Map;
3636
import java.util.Random;
3737
import java.util.UUID;
38+
import java.util.concurrent.Semaphore;
3839
import java.util.regex.Matcher;
3940
import java.util.regex.Pattern;
41+
import sun.misc.Signal;
42+
import sun.misc.SignalHandler;
4043

4144
/** An example implementation of a worker process that is used for integration tests. */
4245
public class ExampleWorker {
@@ -79,6 +82,23 @@ private static void runPersistentWorker(ExampleWorkerOptions workerOptions) thro
7982
PrintStream originalStdOut = System.out;
8083
PrintStream originalStdErr = System.err;
8184

85+
if (workerOptions.waitForSignal) {
86+
Semaphore signalSem = new Semaphore(0);
87+
Signal.handle(
88+
new Signal("HUP"),
89+
new SignalHandler() {
90+
@Override
91+
public void handle(Signal sig) {
92+
signalSem.release();
93+
}
94+
});
95+
try {
96+
signalSem.acquire();
97+
} catch (InterruptedException e) {
98+
System.out.println("Interrupted while waiting for signal");
99+
e.printStackTrace();
100+
}
101+
}
82102
ExampleWorkerProtocol workerProtocol = null;
83103
switch (workerOptions.workerProtocol) {
84104
case JSON:

src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ public static class ExampleWorkOptions extends OptionsBase {
135135
)
136136
public boolean hardPoison;
137137

138+
@Option(
139+
name = "wait_for_signal",
140+
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
141+
effectTags = {OptionEffectTag.NO_OP},
142+
defaultValue = "false",
143+
help = "Don't send a response until receiving a SIGXXXX.")
144+
public boolean waitForSignal;
145+
138146
/** Enum converter for --worker_protocol. */
139147
public static class WorkerProtocolEnumConverter
140148
extends EnumConverter<ExecutionRequirements.WorkerProtocolFormat> {

src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,6 @@ private void verifyGetResponseFailure(String responseString, String expectedErro
174174
assertThat(ex).hasMessageThat().contains(expectedError);
175175
}
176176

177-
@Test
178-
public void testGetResponse_json_emptyString_throws() throws IOException {
179-
verifyGetResponseFailure("", "Could not parse json work request correctly");
180-
}
181-
182177
@Test
183178
public void testGetResponse_badJson_throws() throws IOException {
184179
verifyGetResponseFailure(

0 commit comments

Comments
 (0)