Skip to content

Commit e7a0a71

Browse files
larsrc-googlecopybara-github
authored andcommitted
More properly destroy workers on interrupt.
Interrupts did not wake the workers out of waiting for a response. When running under dynamic execution, that's all we can do until cancellation is implemented. For workers without dynamic execution, the only interrupt is from the build itself getting interrupted, and in that case we don't want to wait - it could take minutes. Instead we then use a busy-wait to notice interrupts and destroy the worker if interrupted. This will be useful even when cancellation is implemented, since not all workers may be able to implement a sensible cancellation. This also renames mustBeSandboxed on WorkerKey to isSpeculative to separate cause and effect. RELNOTES: None. PiperOrigin-RevId: 361802760
1 parent c0ac45c commit e7a0a71

File tree

11 files changed

+119
-165
lines changed

11 files changed

+119
-165
lines changed

src/main/java/com/google/devtools/build/lib/worker/JsonWorkerProtocol.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,17 @@ public void putRequest(WorkRequest request) throws IOException {
5454

5555
@Override
5656
public WorkResponse getResponse() throws IOException {
57+
boolean interrupted = Thread.interrupted();
58+
try {
59+
return parseResponse();
60+
} finally {
61+
if (interrupted) {
62+
Thread.currentThread().interrupt();
63+
}
64+
}
65+
}
66+
67+
private WorkResponse parseResponse() throws IOException {
5768
Integer exitCode = null;
5869
String output = null;
5970
Integer requestId = null;

src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerProtocol.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,14 @@ public void putRequest(WorkRequest request) throws IOException {
4141

4242
@Override
4343
public WorkResponse getResponse() throws IOException {
44-
return WorkResponse.parseDelimitedFrom(workersStdout);
44+
boolean interrupted = Thread.interrupted();
45+
try {
46+
return WorkResponse.parseDelimitedFrom(workersStdout);
47+
} finally {
48+
if (interrupted) {
49+
Thread.currentThread().interrupt();
50+
}
51+
}
4552
}
4653

4754
@Override

src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,28 @@ void putRequest(WorkRequest request) throws IOException {
118118
}
119119

120120
@Override
121-
WorkResponse getResponse(int requestId) throws IOException {
121+
WorkResponse getResponse(int requestId) throws IOException, InterruptedException {
122122
recordingInputStream.startRecording(4096);
123+
// Ironically, we don't allow interrupts during dynamic execution, since we can't cancel
124+
// the worker short of destroying it.
125+
if (!workerKey.isSpeculative()) {
126+
while (recordingInputStream.available() == 0) {
127+
try {
128+
Thread.sleep(10);
129+
} catch (InterruptedException e) {
130+
// This should only happen when not in dynamic execution, so we can safely kill the
131+
// worker.
132+
destroy();
133+
throw e;
134+
}
135+
if (!process.isAlive()) {
136+
throw new IOException(
137+
String.format(
138+
"Worker process for %s died while waiting for response",
139+
workerKey.getMnemonic()));
140+
}
141+
}
142+
}
123143
return workerProtocol.getResponse();
124144
}
125145

src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public Worker create(WorkerKey key) {
6060
workerBaseDir.getRelative(workTypeName + "-" + workerId + "-" + key.getMnemonic() + ".log");
6161

6262
Worker worker;
63-
boolean sandboxed = workerOptions.workerSandboxing || key.mustBeSandboxed();
63+
boolean sandboxed = workerOptions.workerSandboxing || key.isSpeculative();
6464
if (sandboxed) {
6565
Path workDir = getSandboxedWorkerPath(key, workerId);
6666
worker = new SandboxedWorker(key, workerId, workDir, logFile);
@@ -124,30 +124,18 @@ public boolean validateObject(WorkerKey key, PooledObject<Worker> p) {
124124
Worker worker = p.getObject();
125125
Optional<Integer> exitValue = worker.getExitValue();
126126
if (exitValue.isPresent()) {
127-
if (workerOptions.workerVerbose) {
128-
if (worker.diedUnexpectedly()) {
129-
String msg =
130-
String.format(
131-
"%s %s (id %d) has unexpectedly died with exit code %d.",
132-
key.getMnemonic(),
133-
key.getWorkerTypeName(),
134-
worker.getWorkerId(),
135-
exitValue.get());
136-
ErrorMessage errorMessage =
137-
ErrorMessage.builder()
138-
.message(msg)
139-
.logFile(worker.getLogFile())
140-
.logSizeLimit(4096)
141-
.build();
142-
reporter.handle(Event.warn(errorMessage.toString()));
143-
} else {
144-
// Can't rule this out entirely, but it's not an unexpected death.
145-
String msg =
146-
String.format(
147-
"%s %s (id %d) was destroyed, but is still in the worker pool.",
148-
key.getMnemonic(), key.getWorkerTypeName(), worker.getWorkerId());
149-
reporter.handle(Event.info(msg));
150-
}
127+
if (workerOptions.workerVerbose && worker.diedUnexpectedly()) {
128+
String msg =
129+
String.format(
130+
"%s %s (id %d) has unexpectedly died with exit code %d.",
131+
key.getMnemonic(), key.getWorkerTypeName(), worker.getWorkerId(), exitValue.get());
132+
ErrorMessage errorMessage =
133+
ErrorMessage.builder()
134+
.message(msg)
135+
.logFile(worker.getLogFile())
136+
.logSizeLimit(4096)
137+
.build();
138+
reporter.handle(Event.warn(errorMessage.toString()));
151139
}
152140
return false;
153141
}

src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,13 @@
3232
* break various things as well as render the workers less useful.
3333
*/
3434
final class WorkerKey {
35+
/** Build options. */
3536
private final ImmutableList<String> args;
37+
/** Environment variables. */
3638
private final ImmutableMap<String, String> env;
39+
/** Execution root of Bazel process. */
3740
private final Path execRoot;
41+
/** Mnemonic of the worker. */
3842
private final String mnemonic;
3943

4044
/**
@@ -43,16 +47,18 @@ final class WorkerKey {
4347
* methods.
4448
*/
4549
private final HashCode workerFilesCombinedHash;
50+
/** Worker files with the corresponding hash code. */
4651
private final SortedMap<PathFragment, HashCode> workerFilesWithHashes;
47-
private final boolean mustBeSandboxed;
52+
/** Set it to true if this job is running speculatively and thus likely to be interrupted. */
53+
private final boolean isSpeculative;
4854
/** A WorkerProxy will be instantiated if true, instantiate a regular Worker if false. */
4955
private final boolean proxied;
5056
/**
5157
* Cached value for the hash of this key, because the value is expensive to calculate
5258
* (ImmutableMap and ImmutableList do not cache their hashcodes.
5359
*/
5460
private final int hash;
55-
61+
/** The format of the worker protocol sent to and read from the worker. */
5662
private final WorkerProtocolFormat protocolFormat;
5763

5864
WorkerKey(
@@ -62,26 +68,17 @@ final class WorkerKey {
6268
String mnemonic,
6369
HashCode workerFilesCombinedHash,
6470
SortedMap<PathFragment, HashCode> workerFilesWithHashes,
65-
boolean mustBeSandboxed,
71+
boolean isSpeculative,
6672
boolean proxied,
6773
WorkerProtocolFormat protocolFormat) {
68-
/** Build options. */
6974
this.args = Preconditions.checkNotNull(args);
70-
/** Environment variables. */
7175
this.env = Preconditions.checkNotNull(env);
72-
/** Execution root of Bazel process. */
7376
this.execRoot = Preconditions.checkNotNull(execRoot);
74-
/** Mnemonic of the worker. */
7577
this.mnemonic = Preconditions.checkNotNull(mnemonic);
76-
/** One combined hash code for all files. */
7778
this.workerFilesCombinedHash = Preconditions.checkNotNull(workerFilesCombinedHash);
78-
/** Worker files with the corresponding hash code. */
7979
this.workerFilesWithHashes = Preconditions.checkNotNull(workerFilesWithHashes);
80-
/** Set it to true if this job should be run in sandbox. */
81-
this.mustBeSandboxed = mustBeSandboxed;
82-
/** Set it to true if this job should be run with WorkerProxy. */
80+
this.isSpeculative = isSpeculative;
8381
this.proxied = proxied;
84-
/** The format of the worker protocol sent to and read from the worker. */
8582
this.protocolFormat = protocolFormat;
8683

8784
hash = calculateHashCode();
@@ -117,9 +114,9 @@ public SortedMap<PathFragment, HashCode> getWorkerFilesWithHashes() {
117114
return workerFilesWithHashes;
118115
}
119116

120-
/** Getter function for variable mustBeSandboxed. */
121-
public boolean mustBeSandboxed() {
122-
return mustBeSandboxed;
117+
/** Returns true if workers are run speculatively. */
118+
public boolean isSpeculative() {
119+
return isSpeculative;
123120
}
124121

125122
/** Getter function for variable proxied. */
@@ -128,7 +125,7 @@ public boolean getProxied() {
128125
}
129126

130127
public boolean isMultiplex() {
131-
return getProxied() && !mustBeSandboxed;
128+
return getProxied() && !isSpeculative;
132129
}
133130

134131
/** Returns the format of the worker protocol. */
@@ -147,7 +144,7 @@ public static String makeWorkerTypeName(boolean proxied, boolean mustBeSandboxed
147144

148145
/** Returns a user-friendly name for this worker type. */
149146
public String getWorkerTypeName() {
150-
return makeWorkerTypeName(proxied, mustBeSandboxed);
147+
return makeWorkerTypeName(proxied, isSpeculative);
151148
}
152149

153150
@Override

src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.google.common.collect.ImmutableMap;
2121
import com.google.common.eventbus.Subscribe;
2222
import com.google.devtools.build.lib.buildtool.buildevent.BuildCompleteEvent;
23-
import com.google.devtools.build.lib.buildtool.buildevent.BuildInterruptedEvent;
2423
import com.google.devtools.build.lib.buildtool.buildevent.BuildStartingEvent;
2524
import com.google.devtools.build.lib.events.Event;
2625
import com.google.devtools.build.lib.exec.ExecutionOptions;
@@ -72,7 +71,8 @@ public void cleanStarting(CleanStartingEvent event) {
7271
this.options = event.getOptionsProvider().getOptions(WorkerOptions.class);
7372
workerFactory.setReporter(env.getReporter());
7473
workerFactory.setOptions(options);
75-
shutdownPool("Clean command is running, shutting down worker pool...");
74+
shutdownPool(
75+
"Clean command is running, shutting down worker pool...", /* alwaysLog= */ false);
7676
}
7777
}
7878

@@ -179,31 +179,10 @@ public void registerSpawnStrategies(
179179
@Subscribe
180180
public void buildComplete(BuildCompleteEvent event) {
181181
if (options != null && options.workerQuitAfterBuild) {
182-
shutdownPool("Build completed, shutting down worker pool...");
183-
}
184-
}
185-
186-
/**
187-
* Stops any workers that are still executing.
188-
*
189-
* <p>This currently kills off some amount of workers, losing the warmed-up state.
190-
* TODO(b/119701157): Cancel running workers instead (requires some way to reach each worker).
191-
*/
192-
@Subscribe
193-
public void buildInterrupted(BuildInterruptedEvent event) {
194-
if (workerPool != null) {
195-
if ((options != null && options.workerVerbose)) {
196-
env.getReporter().handle(Event.info("Build interrupted, stopping active workers..."));
197-
}
198-
workerPool.stopWork();
182+
shutdownPool("Build completed, shutting down worker pool...", /* alwaysLog= */ false);
199183
}
200184
}
201185

202-
/** Shuts down the worker pool and sets {#code workerPool} to null. */
203-
private void shutdownPool(String reason) {
204-
shutdownPool(reason, /* alwaysLog= */ false);
205-
}
206-
207186
/** Shuts down the worker pool and sets {#code workerPool} to null. */
208187
private void shutdownPool(String reason, boolean alwaysLog) {
209188
Preconditions.checkArgument(!reason.isEmpty());

src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,8 @@ private boolean sendRequest() {
314314
*
315315
* <p>This is only called on the readResponses subthread and so cannot be interrupted by dynamic
316316
* execution cancellation, but only by a call to {@link #destroyProcess()}.
317+
*
318+
* @return True if the worker is still in a consistent state.
317319
*/
318320
private boolean readResponse() {
319321
WorkResponse parsedResponse;

src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -189,28 +189,12 @@ private void waitForHighPriorityWorkersToFinish() throws InterruptedException {
189189
}
190190
}
191191

192+
/**
193+
* Closes all the worker pools, destroying the workers in the process. This waits for any
194+
* currently-ongoing work to finish.
195+
*/
192196
public void close() {
193197
workerPools.values().forEach(GenericKeyedObjectPool::close);
194198
multiplexPools.values().forEach(GenericKeyedObjectPool::close);
195199
}
196-
197-
/** Stops any ongoing work in the worker pools. This may entail killing the worker processes. */
198-
public void stopWork() {
199-
workerPools
200-
.values()
201-
.forEach(
202-
pool -> {
203-
if (pool.getNumActive() > 0) {
204-
pool.clear();
205-
}
206-
});
207-
multiplexPools
208-
.values()
209-
.forEach(
210-
pool -> {
211-
if (pool.getNumActive() > 0) {
212-
pool.clear();
213-
}
214-
});
215-
}
216200
}

src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,6 @@ WorkResponse execInWorker(
384384
Worker worker = null;
385385
WorkResponse response;
386386
WorkRequest request;
387-
388387
ActionExecutionMetadata owner = spawn.getResourceOwner();
389388
try {
390389
Stopwatch setupInputsStopwatch = Stopwatch.createStarted();

0 commit comments

Comments
 (0)