Skip to content

Commit a607d9d

Browse files
larsrc-googlecopybara-github
authored andcommitted
Never create more than one process per WorkerMultiplexer.
Turns out the ability to re-create a process makes everything complicated. Instead, just let the WorkerMultiplexer instance fall to the floor and create a new one as needed. Also restores interrupts in more places, handles some non-io-exceptions better in WorkerSpawnRunner, checks a few more edge cases around the multiplexer, makes the multiplexer try not to get interrupted during actual read, avoids creating unnecessary WorkerMultiplexer garbage, removes shutdownhooks on workerproxy destruction, sets the multiplexer reporter earlier, and improves some error messages. RELNOTES: n/a PiperOrigin-RevId: 351606949
1 parent f431b0c commit a607d9d

File tree

8 files changed

+142
-141
lines changed

8 files changed

+142
-141
lines changed

src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,4 +163,9 @@ String getRecordingStreamMessage() {
163163
recordingInputStream.readRemaining();
164164
return recordingInputStream.getRecordedDataAsString();
165165
}
166+
167+
@Override
168+
public String toString() {
169+
return workerKey.getMnemonic() + " worker #" + workerId;
170+
}
166171
}

src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void setOptions(WorkerOptions workerOptions) {
5353
}
5454

5555
@Override
56-
public Worker create(WorkerKey key) throws Exception {
56+
public Worker create(WorkerKey key) {
5757
int workerId = pidCounter.getAndIncrement();
5858
String workTypeName = WorkerKey.makeWorkerTypeName(key.getProxied());
5959
Path logFile =
@@ -66,9 +66,7 @@ public Worker create(WorkerKey key) throws Exception {
6666
worker = new SandboxedWorker(key, workerId, workDir, logFile);
6767
} else if (key.getProxied()) {
6868
WorkerMultiplexer workerMultiplexer = WorkerMultiplexerManager.getInstance(key, logFile);
69-
worker =
70-
new WorkerProxy(
71-
key, workerId, key.getExecRoot(), workerMultiplexer.getLogFile(), workerMultiplexer);
69+
worker = new WorkerProxy(key, workerId, workerMultiplexer.getLogFile(), workerMultiplexer);
7270
} else {
7371
worker = new SingleplexWorker(key, workerId, key.getExecRoot(), logFile);
7472
}
@@ -112,13 +110,12 @@ public PooledObject<Worker> wrap(Worker worker) {
112110
@Override
113111
public void destroyObject(WorkerKey key, PooledObject<Worker> p) throws Exception {
114112
if (workerOptions.workerVerbose) {
113+
int workerId = p.getObject().getWorkerId();
115114
reporter.handle(
116115
Event.info(
117116
String.format(
118117
"Destroying %s %s (id %d)",
119-
key.getMnemonic(),
120-
WorkerKey.makeWorkerTypeName(key.getProxied()),
121-
p.getObject().getWorkerId())));
118+
key.getMnemonic(), WorkerKey.makeWorkerTypeName(key.getProxied()), workerId)));
122119
}
123120
p.getObject().destroy();
124121
}
@@ -161,10 +158,10 @@ public boolean validateObject(WorkerKey key, PooledObject<Worker> p) {
161158
}
162159
return false;
163160
}
164-
boolean hashMatches =
165-
key.getWorkerFilesCombinedHash().equals(worker.getWorkerFilesCombinedHash());
161+
boolean filesChanged =
162+
!key.getWorkerFilesCombinedHash().equals(worker.getWorkerFilesCombinedHash());
166163

167-
if (workerOptions.workerVerbose && reporter != null && !hashMatches) {
164+
if (workerOptions.workerVerbose && reporter != null && filesChanged) {
168165
StringBuilder msg = new StringBuilder();
169166
msg.append(
170167
String.format(
@@ -191,6 +188,6 @@ public boolean validateObject(WorkerKey key, PooledObject<Worker> p) {
191188
reporter.handle(Event.warn(msg.toString()));
192189
}
193190

194-
return hashMatches;
191+
return !filesChanged;
195192
}
196193
}

src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public Iterable<Class<? extends OptionsBase>> getCommandOptions(Command command)
6767
public void beforeCommand(CommandEnvironment env) {
6868
this.env = env;
6969
env.getEventBus().register(this);
70+
WorkerMultiplexerManager.beforeCommand(env);
7071
}
7172

7273
@Subscribe
@@ -236,6 +237,6 @@ public void afterCommand() {
236237
if (this.workerFactory != null) {
237238
this.workerFactory.setReporter(null);
238239
}
239-
WorkerMultiplexerManager.afterCommandCleanup();
240+
WorkerMultiplexerManager.afterCommand();
240241
}
241242
}

src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java

Lines changed: 45 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.ConcurrentHashMap;
3535
import java.util.concurrent.ConcurrentMap;
3636
import java.util.concurrent.Semaphore;
37+
import javax.annotation.Nullable;
3738

3839
/**
3940
* An intermediate worker that sends requests and receives responses from the worker processes.
@@ -48,29 +49,23 @@ public class WorkerMultiplexer extends Thread {
4849
* A map of {@code WorkResponse}s received from the worker process. They are stored in this map
4950
* keyed by the request id until the corresponding {@code WorkerProxy} picks them up.
5051
*/
51-
private final ConcurrentMap<Integer, WorkResponse> workerProcessResponse;
52+
private final ConcurrentMap<Integer, WorkResponse> workerProcessResponse =
53+
new ConcurrentHashMap<>();
5254
/**
5355
* A map of semaphores corresponding to {@code WorkRequest}s. After sending the {@code
5456
* WorkRequest}, {@code WorkerProxy} will wait on a semaphore to be released. {@code
5557
* WorkerMultiplexer} is responsible for releasing the corresponding semaphore in order to signal
5658
* {@code WorkerProxy} that the {@code WorkerResponse} has been received.
5759
*/
58-
private final ConcurrentMap<Integer, Semaphore> responseChecker;
59-
/** The worker process that this WorkerMultiplexer should be talking to. */
60-
private Subprocess process;
60+
private final ConcurrentMap<Integer, Semaphore> responseChecker = new ConcurrentHashMap<>();
6161
/**
62-
* Set to true if one of the worker processes returns an unparseable response, or for other
63-
* reasons we can't properly handle the remaining responses. We then discard all the responses
64-
* from other work requests and abort.
62+
* The worker process that this WorkerMultiplexer should be talking to. This should only be set
63+
* once, when creating a new process. If the process dies or its stdio streams get corrupted, the
64+
* {@code WorkerMultiplexer} gets discarded as well and a new one gets created as needed.
6565
*/
66-
private boolean isWorkerStreamCorrupted;
66+
private Subprocess process;
6767
/** InputStream from the worker process. */
6868
private RecordingInputStream recordingStream;
69-
/**
70-
* True if we have received EOF on the stream from the worker process. We then stop processing,
71-
* and all workers still waiting for responses will fail.
72-
*/
73-
private boolean isWorkerStreamClosed;
7469
/** True if this multiplexer was explicitly destroyed. */
7570
private boolean wasDestroyed;
7671
/**
@@ -89,25 +84,20 @@ public class WorkerMultiplexer extends Thread {
8984
* The active Reporter object, non-null if {@code --worker_verbose} is set. This must be cleared
9085
* at the end of a command execution.
9186
*/
92-
public EventHandler reporter;
87+
private EventHandler reporter;
9388

9489
WorkerMultiplexer(Path logFile, WorkerKey workerKey) {
9590
this.logFile = logFile;
9691
this.workerKey = workerKey;
97-
responseChecker = new ConcurrentHashMap<>();
98-
workerProcessResponse = new ConcurrentHashMap<>();
99-
isWorkerStreamCorrupted = false;
100-
isWorkerStreamClosed = false;
101-
wasDestroyed = false;
10292
}
10393

10494
/** Sets or clears the reporter for outputting verbose info. */
105-
void setReporter(EventHandler reporter) {
95+
synchronized void setReporter(@Nullable EventHandler reporter) {
10696
this.reporter = reporter;
10797
}
10898

10999
/** Reports a string to the user if reporting is enabled. */
110-
private void report(String s) {
100+
private synchronized void report(String s) {
111101
EventHandler r = this.reporter; // Protect against race condition with setReporter().
112102
if (r != null && s != null) {
113103
r.handle(Event.info(s));
@@ -119,17 +109,17 @@ private void report(String s) {
119109
* exist. Also makes sure this {@code WorkerMultiplexer} runs as a separate thread.
120110
*/
121111
public synchronized void createProcess(Path workDir) throws IOException {
122-
// The process may have died in the meanwhile (e.g. between builds).
123-
if (this.process == null || !this.process.isAlive()) {
112+
if (this.process == null) {
113+
if (this.wasDestroyed) {
114+
throw new IOException("Multiplexer destroyed before created process");
115+
}
124116
ImmutableList<String> args = workerKey.getArgs();
125117
File executable = new File(args.get(0));
126118
if (!executable.isAbsolute() && executable.getParent() != null) {
127119
List<String> newArgs = new ArrayList<>(args);
128120
newArgs.set(0, new File(workDir.getPathFile(), newArgs.get(0)).getAbsolutePath());
129121
args = ImmutableList.copyOf(newArgs);
130122
}
131-
isWorkerStreamCorrupted = false;
132-
isWorkerStreamClosed = false;
133123
SubprocessBuilder processBuilder =
134124
subprocessFactory != null
135125
? new SubprocessBuilder(subprocessFactory)
@@ -139,6 +129,8 @@ public synchronized void createProcess(Path workDir) throws IOException {
139129
processBuilder.setStderr(logFile.getPathFile());
140130
processBuilder.setEnv(workerKey.getEnv());
141131
this.process = processBuilder.start();
132+
} else if (!this.process.isAlive()) {
133+
throw new IOException("Process is dead");
142134
}
143135
if (!this.isAlive()) {
144136
this.start();
@@ -155,24 +147,24 @@ public Path getLogFile() {
155147

156148
/**
157149
* Signals this object to destroy itself, including the worker process. The object might not be
158-
* fully destroyed at the end of this call, but will terminate soon.
150+
* fully destroyed at the end of this call, but will terminate soon. This is considered a
151+
* deliberate destruction.
159152
*/
160153
public synchronized void destroyMultiplexer() {
161154
if (this.process != null) {
162-
destroyProcess(this.process);
163-
this.process = null;
155+
destroyProcess();
164156
}
165157
wasDestroyed = true;
166158
}
167159

168160
/** Destroys the worker subprocess. This might block forever if the subprocess refuses to die. */
169-
private void destroyProcess(Subprocess process) {
161+
private synchronized void destroyProcess() {
170162
boolean wasInterrupted = false;
171163
try {
172-
process.destroy();
164+
this.process.destroy();
173165
while (true) {
174166
try {
175-
process.waitFor();
167+
this.process.waitFor();
176168
return;
177169
} catch (InterruptedException ie) {
178170
wasInterrupted = true;
@@ -183,7 +175,6 @@ private void destroyProcess(Subprocess process) {
183175
if (wasInterrupted) {
184176
Thread.currentThread().interrupt(); // preserve interrupted status
185177
}
186-
isWorkerStreamClosed = true;
187178
}
188179
}
189180

@@ -200,10 +191,6 @@ public synchronized void putRequest(WorkRequest request) throws IOException {
200191
// We can't know how much of the request was sent, so we have to assume the worker's input
201192
// now contains garbage.
202193
// TODO(b/151767359): Avoid causing garbage! Maybe by sending in a separate thread?
203-
isWorkerStreamCorrupted = true;
204-
if (e instanceof InterruptedIOException) {
205-
Thread.currentThread().interrupt();
206-
}
207194
responseChecker.remove(request.getRequestId());
208195
throw e;
209196
}
@@ -228,10 +215,8 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException {
228215
// Wait for the multiplexer to get our response and release this semaphore. The semaphore will
229216
// throw {@code InterruptedException} when the multiplexer is terminated.
230217
waitForResponse.acquire();
231-
report("Acquired response semaphore for " + requestId);
232218

233219
WorkResponse workResponse = workerProcessResponse.get(requestId);
234-
report("Response for " + requestId + " is " + workResponse);
235220
return workResponse;
236221
} finally {
237222
responseChecker.remove(requestId);
@@ -247,25 +232,25 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException {
247232
* execution cancellation.
248233
*/
249234
private void waitResponse() throws InterruptedException, IOException {
250-
Subprocess p = this.process;
251-
if (p == null || !p.isAlive()) {
252-
// Avoid busy-wait for a new process.
235+
recordingStream = new RecordingInputStream(this.process.getInputStream());
236+
recordingStream.startRecording(4096);
237+
// TODO(larsrc): Turn this into a loop that also sends requests.
238+
// Allow interrupts while waiting for responses, without conflating it with I/O errors.
239+
while (recordingStream.available() == 0) {
240+
if (!this.process.isAlive()) {
241+
throw new IOException(
242+
String.format("Multiplexer process for %s is dead", workerKey.getMnemonic()));
243+
}
253244
Thread.sleep(1);
254-
return;
255245
}
256-
recordingStream = new RecordingInputStream(p.getInputStream());
257-
recordingStream.startRecording(4096);
258246
WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream);
259247

260248
// A null parsedResponse can only happen if the input stream is closed, in which case we
261249
// drop everything.
262250
if (parsedResponse == null) {
263-
isWorkerStreamClosed = true;
264-
report(
251+
throw new IOException(
265252
String.format(
266-
"Multiplexer process for %s has closed its output, aborting multiplexer",
267-
workerKey.getMnemonic()));
268-
return;
253+
"Multiplexer process for %s died while reading response", workerKey.getMnemonic()));
269254
}
270255

271256
int requestId = parsedResponse.getRequestId();
@@ -287,13 +272,15 @@ private void waitResponse() throws InterruptedException, IOException {
287272
/** The multiplexer thread that listens to the WorkResponse from worker process. */
288273
@Override
289274
public void run() {
290-
while (!isWorkerStreamClosed && !isWorkerStreamCorrupted) {
275+
while (this.process.isAlive()) {
291276
try {
292277
waitResponse();
293278
} catch (IOException e) {
294279
// We got this exception while reading from the worker's stdout. We can't trust the
295280
// output any more at that point.
296-
isWorkerStreamCorrupted = true;
281+
if (this.process.isAlive()) {
282+
destroyProcess();
283+
}
297284
if (e instanceof InterruptedIOException) {
298285
report(
299286
String.format(
@@ -315,17 +302,12 @@ public void run() {
315302
// will let fall on the floor, but we still want to leave the process running for the next
316303
// build.
317304
// TODO(b/151767359): Cancel all outstanding requests when cancellation is implemented.
318-
releaseAllSemaphores();
305+
for (Semaphore semaphore : responseChecker.values()) {
306+
semaphore.release();
307+
}
319308
}
320309
}
321-
// If we get here, the worker process is either dead or corrupted. We could attempt to restart
322-
// it, but the outstanding requests will have failed already. Until we have a way to signal
323-
// transient failures, we have to just reject all further requests and make sure the process
324-
// is really dead
325310
synchronized (this) {
326-
if (process != null && process.isAlive()) {
327-
destroyMultiplexer();
328-
}
329311
releaseAllSemaphores();
330312
}
331313
}
@@ -350,14 +332,14 @@ String getRecordingStreamMessage() {
350332

351333
/** Returns true if this process has died for other reasons than a call to {@code #destroy()}. */
352334
boolean diedUnexpectedly() {
353-
Subprocess p = this.process; // Protects against this.process getting null.
354-
return p != null && !p.isAlive() && !wasDestroyed;
335+
return this.process != null && !this.process.isAlive() && !wasDestroyed;
355336
}
356337

357338
/** Returns the exit value of multiplexer's process, if it has exited. */
358339
Optional<Integer> getExitValue() {
359-
Subprocess p = this.process; // Protects against this.process getting null.
360-
return p != null && !p.isAlive() ? Optional.of(p.exitValue()) : Optional.empty();
340+
return this.process != null && !this.process.isAlive()
341+
? Optional.of(this.process.exitValue())
342+
: Optional.empty();
361343
}
362344

363345
/** For testing only, to verify that maps are cleared after responses are reaped. */

0 commit comments

Comments
 (0)