Skip to content

Commit 80c03ef

Browse files
larsrc-googlecopybara-github
authored andcommitted
Move sending requests and reading responses for multiplex workers into separate subthreads.
Nice symmetry here, but more importantly prevents dynamic execution from killing the workers. Also makes the code nicer. RELNOTES: None. PiperOrigin-RevId: 352389574
1 parent bbac0c3 commit 80c03ef

File tree

3 files changed

+129
-105
lines changed

3 files changed

+129
-105
lines changed

src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java

Lines changed: 123 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
package com.google.devtools.build.lib.worker;
1616

17+
1718
import com.google.common.annotations.VisibleForTesting;
1819
import com.google.common.collect.ImmutableList;
19-
import com.google.common.flogger.GoogleLogger;
2020
import com.google.devtools.build.lib.events.Event;
2121
import com.google.devtools.build.lib.events.EventHandler;
2222
import com.google.devtools.build.lib.shell.Subprocess;
@@ -31,8 +31,10 @@
3131
import java.util.ArrayList;
3232
import java.util.List;
3333
import java.util.Optional;
34+
import java.util.concurrent.BlockingQueue;
3435
import java.util.concurrent.ConcurrentHashMap;
3536
import java.util.concurrent.ConcurrentMap;
37+
import java.util.concurrent.LinkedBlockingQueue;
3638
import java.util.concurrent.Semaphore;
3739
import javax.annotation.Nullable;
3840

@@ -43,8 +45,14 @@
4345
* into them to send requests. When a worker process returns a {@code WorkResponse}, {@code
4446
* WorkerMultiplexer} wakes up the relevant {@code WorkerProxy} to retrieve the response.
4547
*/
46-
public class WorkerMultiplexer extends Thread {
47-
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
48+
public class WorkerMultiplexer {
49+
/**
50+
* A queue of {@link WorkRequest} instances that need to be sent to the worker. {@link
51+
* WorkerProxy} instances add to this queue, while the requestSender subthread remove requests and
52+
* send them to the worker. This prevents dynamic execution interrupts from corrupting the {@code
53+
* stdin} of the worker process.
54+
*/
55+
private final BlockingQueue<WorkRequest> pendingRequests = new LinkedBlockingQueue<>();
4856
/**
4957
* A map of {@code WorkResponse}s received from the worker process. They are stored in this map
5058
* keyed by the request id until the corresponding {@code WorkerProxy} picks them up.
@@ -80,6 +88,12 @@ public class WorkerMultiplexer extends Thread {
8088
/** For testing only, allow a way to fake subprocesses. */
8189
private SubprocessFactory subprocessFactory;
8290

91+
/** A separate thread that sends requests. */
92+
private Thread requestSender;
93+
94+
/** A separate thread that receives responses. */
95+
private Thread responseReceiver;
96+
8397
/**
8498
* The active Reporter object, non-null if {@code --worker_verbose} is set. This must be cleared
8599
* at the end of a command execution.
@@ -97,16 +111,15 @@ synchronized void setReporter(@Nullable EventHandler reporter) {
97111
}
98112

99113
/** Reports a string to the user if reporting is enabled. */
100-
private synchronized void report(String s) {
101-
EventHandler r = this.reporter; // Protect against race condition with setReporter().
102-
if (r != null && s != null) {
103-
r.handle(Event.info(s));
114+
private synchronized void report(@Nullable String s) {
115+
if (this.reporter != null && s != null) {
116+
this.reporter.handle(Event.info(s));
104117
}
105118
}
106119

107120
/**
108121
* Creates a worker process corresponding to this {@code WorkerMultiplexer}, if it doesn't already
109-
* exist. Also makes sure this {@code WorkerMultiplexer} runs as a separate thread.
122+
* exist. Also starts up the subthreads handling reading and writing requests and responses.
110123
*/
111124
public synchronized void createProcess(Path workDir) throws IOException {
112125
if (this.process == null) {
@@ -129,12 +142,25 @@ public synchronized void createProcess(Path workDir) throws IOException {
129142
processBuilder.setStderr(logFile.getPathFile());
130143
processBuilder.setEnv(workerKey.getEnv());
131144
this.process = processBuilder.start();
145+
String id = workerKey.getMnemonic() + "-" + workerKey.hashCode();
146+
// TODO(larsrc): Consider moving sender/receiver threads into separate classes.
147+
this.requestSender =
148+
new Thread(
149+
() -> {
150+
while (process.isAlive() && sendRequest()) {}
151+
});
152+
this.requestSender.setName("multiplexer-request-sender-" + id);
153+
this.requestSender.start();
154+
this.responseReceiver =
155+
new Thread(
156+
() -> {
157+
while (process.isAlive() && readResponse()) {}
158+
});
159+
this.responseReceiver.setName("multiplexer-response-receiver-" + id);
160+
this.responseReceiver.start();
132161
} else if (!this.process.isAlive()) {
133162
throw new IOException("Process is dead");
134163
}
135-
if (!this.isAlive()) {
136-
this.start();
137-
}
138164
}
139165

140166
/**
@@ -157,7 +183,10 @@ public synchronized void destroyMultiplexer() {
157183
wasDestroyed = true;
158184
}
159185

160-
/** Destroys the worker subprocess. This might block forever if the subprocess refuses to die. */
186+
/**
187+
* Destroys the worker subprocess. This might block forever if the subprocess refuses to die. It
188+
* is safe to call this multiple times.
189+
*/
161190
private synchronized void destroyProcess() {
162191
boolean wasInterrupted = false;
163192
try {
@@ -171,6 +200,17 @@ private synchronized void destroyProcess() {
171200
}
172201
}
173202
} finally {
203+
// Stop the subthreads only when the process is dead, or their loops will go on.
204+
if (this.requestSender != null) {
205+
this.requestSender.interrupt();
206+
}
207+
if (this.responseReceiver != null) {
208+
this.responseReceiver.interrupt();
209+
}
210+
// Might as well release any waiting workers
211+
for (Semaphore semaphore : responseChecker.values()) {
212+
semaphore.release();
213+
}
174214
// Read this for detailed explanation: http://www.ibm.com/developerworks/library/j-jtp05236/
175215
if (wasInterrupted) {
176216
Thread.currentThread().interrupt(); // preserve interrupted status
@@ -183,17 +223,12 @@ private synchronized void destroyProcess() {
183223
* WorkerProxy}, and so is subject to interrupts by dynamic execution.
184224
*/
185225
public synchronized void putRequest(WorkRequest request) throws IOException {
186-
responseChecker.put(request.getRequestId(), new Semaphore(0));
187-
try {
188-
request.writeDelimitedTo(process.getOutputStream());
189-
process.getOutputStream().flush();
190-
} catch (IOException e) {
191-
// We can't know how much of the request was sent, so we have to assume the worker's input
192-
// now contains garbage.
193-
// TODO(b/151767359): Avoid causing garbage! Maybe by sending in a separate thread?
194-
responseChecker.remove(request.getRequestId());
195-
throw e;
226+
if (!process.isAlive()) {
227+
throw new IOException(
228+
"Attempting to send request " + request.getRequestId() + " to dead process");
196229
}
230+
responseChecker.put(request.getRequestId(), new Semaphore(0));
231+
pendingRequests.add(request);
197232
}
198233

199234
/**
@@ -203,58 +238,99 @@ public synchronized void putRequest(WorkRequest request) throws IOException {
203238
*/
204239
public WorkResponse getResponse(Integer requestId) throws InterruptedException {
205240
try {
241+
if (!process.isAlive()) {
242+
// If the process has died, all we can do is return what may already have been returned.
243+
return workerProcessResponse.get(requestId);
244+
}
245+
206246
Semaphore waitForResponse = responseChecker.get(requestId);
207247

208248
if (waitForResponse == null) {
209249
report("Null response semaphore for " + requestId);
210-
// If the multiplexer is interrupted when a {@code WorkerProxy} is trying to send a request,
211-
// the request is not sent, so there is no need to wait for a response.
212-
return null;
250+
// If there is no semaphore for this request, it probably failed to send, so we just return
251+
// what we got, probably nothing.
252+
return workerProcessResponse.get(requestId);
213253
}
214254

215255
// Wait for the multiplexer to get our response and release this semaphore. The semaphore will
216256
// throw {@code InterruptedException} when the multiplexer is terminated.
217257
waitForResponse.acquire();
218258

219-
WorkResponse workResponse = workerProcessResponse.get(requestId);
220-
return workResponse;
259+
return workerProcessResponse.get(requestId);
221260
} finally {
222261
responseChecker.remove(requestId);
223262
workerProcessResponse.remove(requestId);
224263
}
225264
}
226265

227266
/**
228-
* Waits to read a {@code WorkResponse} from worker process, put that {@code WorkResponse} in
229-
* {@code workerProcessResponse} and release the semaphore for the {@code WorkerProxy}.
267+
* Sends a single pending request, if there are any. Blocks until a request is available.
230268
*
231-
* <p>This is only called on the WorkerMultiplexer thread and so cannot be interrupted by dynamic
232-
* execution cancellation.
269+
* <p>This is only called by the {@code requestSender} thread and so cannot be interrupted by
270+
* dynamic execution cancellation, but only by a call to {@link #destroyProcess()}.
233271
*/
234-
private void waitResponse() throws InterruptedException, IOException {
235-
recordingStream = new RecordingInputStream(this.process.getInputStream());
236-
recordingStream.startRecording(4096);
237-
// TODO(larsrc): Turn this into a loop that also sends requests.
238-
// Allow interrupts while waiting for responses, without conflating it with I/O errors.
239-
while (recordingStream.available() == 0) {
240-
if (!this.process.isAlive()) {
241-
throw new IOException(
242-
String.format("Multiplexer process for %s is dead", workerKey.getMnemonic()));
272+
private boolean sendRequest() {
273+
WorkRequest request;
274+
try {
275+
request = pendingRequests.take();
276+
} catch (InterruptedException e) {
277+
return false;
278+
}
279+
try {
280+
request.writeDelimitedTo(process.getOutputStream());
281+
process.getOutputStream().flush();
282+
} catch (IOException e) {
283+
// We can't know how much of the request was sent, so we have to assume the worker's input
284+
// now contains garbage, and this request is lost.
285+
// TODO(b/177637516): Signal that this action failed for presumably transient reasons.
286+
report("Failed to send request " + request.getRequestId());
287+
Semaphore s = responseChecker.remove(request.getRequestId());
288+
if (s != null) {
289+
s.release();
243290
}
244-
Thread.sleep(1);
291+
// TODO(b/177637516): Leave process in a moribound state so pending responses can be returned.
292+
destroyProcess();
293+
return false;
245294
}
246-
WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream);
295+
return true;
296+
}
247297

248-
// A null parsedResponse can only happen if the input stream is closed, in which case we
298+
/**
299+
* Reads a {@code WorkResponse} from worker process, puts that {@code WorkResponse} in {@code
300+
* workerProcessResponse}, and releases the semaphore for the {@code WorkerProxy}.
301+
*
302+
* <p>This is only called on the readResponses subthread and so cannot be interrupted by dynamic
303+
* execution cancellation, but only by a call to {@link #destroyProcess()}.
304+
*/
305+
private boolean readResponse() {
306+
recordingStream = new RecordingInputStream(process.getInputStream());
307+
recordingStream.startRecording(4096);
308+
WorkResponse parsedResponse;
309+
try {
310+
parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream);
311+
} catch (IOException e) {
312+
if (!(e instanceof InterruptedIOException)) {
313+
report(
314+
String.format(
315+
"Error while reading response from multiplexer process for %s: %s",
316+
workerKey.getMnemonic(), e));
317+
}
318+
// We can't know how much of the response was read, so we have to assume the worker's output
319+
// now contains garbage, and we can't reliably read any further responses.
320+
destroyProcess();
321+
return false;
322+
}
323+
// A null parsedResponse can happen if the input stream is closed, in which case we
249324
// drop everything.
250325
if (parsedResponse == null) {
251-
throw new IOException(
326+
report(
252327
String.format(
253-
"Multiplexer process for %s died while reading response", workerKey.getMnemonic()));
328+
"Multiplexer process for %s has closed its output stream", workerKey.getMnemonic()));
329+
destroyProcess();
330+
return false;
254331
}
255332

256333
int requestId = parsedResponse.getRequestId();
257-
258334
workerProcessResponse.put(requestId, parsedResponse);
259335

260336
// TODO(b/151767359): When allowing cancellation, just remove responses that have no matching
@@ -267,61 +343,7 @@ private void waitResponse() throws InterruptedException, IOException {
267343
report(String.format("Multiplexer for %s found no semaphore", workerKey.getMnemonic()));
268344
workerProcessResponse.remove(requestId);
269345
}
270-
}
271-
272-
/** The multiplexer thread that listens to the WorkResponse from worker process. */
273-
@Override
274-
public void run() {
275-
while (this.process.isAlive()) {
276-
try {
277-
waitResponse();
278-
} catch (IOException e) {
279-
// We got this exception while reading from the worker's stdout. We can't trust the
280-
// output any more at that point.
281-
if (this.process.isAlive()) {
282-
destroyProcess();
283-
}
284-
if (e instanceof InterruptedIOException) {
285-
report(
286-
String.format(
287-
"Multiplexer process for %s was interrupted during I/O, aborting multiplexer",
288-
workerKey.getMnemonic()));
289-
} else {
290-
// TODO(larsrc): Output the recorded message.
291-
report(
292-
String.format(
293-
"Multiplexer for %s got IOException reading a response, aborting multiplexer",
294-
workerKey.getMnemonic()));
295-
logger.atWarning().withCause(e).log(
296-
"Caught IOException while waiting for worker response. "
297-
+ "It could be because the worker returned an unparseable response.");
298-
}
299-
} catch (InterruptedException e) {
300-
// This should only happen when the Blaze build has been aborted (failed or cancelled). In
301-
// that case, there may still be some outstanding requests in the worker process, which we
302-
// will let fall on the floor, but we still want to leave the process running for the next
303-
// build.
304-
// TODO(b/151767359): Cancel all outstanding requests when cancellation is implemented.
305-
for (Semaphore semaphore : responseChecker.values()) {
306-
semaphore.release();
307-
}
308-
}
309-
}
310-
synchronized (this) {
311-
releaseAllSemaphores();
312-
}
313-
}
314-
315-
/**
316-
* Release all the semaphores and clear the related maps. Must only be called when we are shutting
317-
* down the multiplexer.
318-
*/
319-
private void releaseAllSemaphores() {
320-
for (Semaphore semaphore : responseChecker.values()) {
321-
semaphore.release();
322-
}
323-
responseChecker.clear();
324-
workerProcessResponse.clear();
346+
return true;
325347
}
326348

327349
String getRecordingStreamMessage() {

src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ public static synchronized void removeInstance(WorkerKey key) throws UserExecExc
8080
}
8181
instanceInfo.decreaseRefCount();
8282
if (instanceInfo.getRefCount() == 0) {
83-
instanceInfo.getWorkerMultiplexer().interrupt();
8483
instanceInfo.getWorkerMultiplexer().destroyMultiplexer();
8584
multiplexerInstance.remove(key);
8685
}

src/test/shell/integration/bazel_worker_multiplexer_test.sh

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,13 @@ EOF
205205
bazel build :hello_world_1 &> $TEST_log \
206206
|| fail "build failed"
207207

208-
bazel build :hello_world_2 &> $TEST_log \
209-
&& fail "expected build to failed" || true
208+
bazel build --worker_verbose :hello_world_2 >> $TEST_log 2>&1 \
209+
&& fail "expected build to fail" || true
210+
211+
error_msgs=$(egrep -o -- 'Worker process (did not return a|returned an unparseable) WorkResponse' "$TEST_log")
210212

211-
expect_log "Worker process quit or closed its stdin stream when we tried to send a WorkRequest"
213+
[ -n "$error_msgs" ] \
214+
|| fail "expected error message not found"
212215
}
213216

214217
function test_worker_restarts_when_worker_binary_changes() {

0 commit comments

Comments
 (0)