13
13
// limitations under the License.
14
14
package com .google .devtools .build .lib .worker ;
15
15
16
-
17
16
import com .google .common .annotations .VisibleForTesting ;
18
17
import com .google .devtools .build .lib .worker .WorkerProtocol .WorkRequest ;
19
18
import com .google .devtools .build .lib .worker .WorkerProtocol .WorkResponse ;
24
23
import java .io .StringWriter ;
25
24
import java .lang .management .ManagementFactory ;
26
25
import java .time .Duration ;
27
- import java .util .ArrayDeque ;
28
26
import java .util .List ;
29
- import java .util .Map ;
30
27
import java .util .Optional ;
31
- import java .util .Queue ;
32
28
import java .util .concurrent .ConcurrentHashMap ;
29
+ import java .util .concurrent .ConcurrentMap ;
33
30
import java .util .concurrent .atomic .AtomicReference ;
31
+ import java .util .function .BiConsumer ;
34
32
import java .util .function .BiFunction ;
35
33
36
34
/**
@@ -56,13 +54,31 @@ public interface WorkerMessageProcessor {
56
54
57
55
/** Holds information necessary to properly handle a request, especially for cancellation. */
58
56
static class RequestInfo {
57
+ /** The thread handling the request. */
58
+ final Thread thread ;
59
+ /** If true, we have received a cancel request for this request. */
60
+ private boolean cancelled ;
59
61
/**
60
62
* The builder for the response to this request. Since only one response must be sent per
61
63
* request, this builder must be accessed through takeBuilder(), which zeroes this field and
62
64
* returns the builder.
63
65
*/
64
66
private WorkResponse .Builder responseBuilder = WorkResponse .newBuilder ();
65
67
68
+ RequestInfo (Thread thread ) {
69
+ this .thread = thread ;
70
+ }
71
+
72
+ /** Sets whether this request has been cancelled. */
73
+ void setCancelled () {
74
+ cancelled = true ;
75
+ }
76
+
77
+ /** Returns true if this request has been cancelled. */
78
+ boolean isCancelled () {
79
+ return cancelled ;
80
+ }
81
+
66
82
/**
67
83
* Returns the response builder. If called more than once on the same instance, subsequent calls
68
84
* will return {@code null}.
@@ -72,13 +88,22 @@ synchronized Optional<WorkResponse.Builder> takeBuilder() {
72
88
responseBuilder = null ;
73
89
return Optional .ofNullable (b );
74
90
}
91
+
92
+ /**
93
+ * Adds {@code s} as output to when the response eventually gets built. Does nothing if the
94
+ * response has already been taken. There is no guarantee that the response hasn't already been
95
+ * taken, making this call a no-op. This may be called multiple times. No delimiters are added
96
+ * between strings from multiple calls.
97
+ */
98
+ synchronized void addOutput (String s ) {
99
+ if (responseBuilder != null ) {
100
+ responseBuilder .setOutput (responseBuilder .getOutput () + s );
101
+ }
102
+ }
75
103
}
76
104
77
105
/** Requests that are currently being processed. Visible for testing. */
78
- final Map <Integer , RequestInfo > activeRequests = new ConcurrentHashMap <>();
79
-
80
- /** WorkRequests that have been received but could not be processed yet. */
81
- private final Queue <WorkRequest > availableRequests = new ArrayDeque <>();
106
+ final ConcurrentMap <Integer , RequestInfo > activeRequests = new ConcurrentHashMap <>();
82
107
83
108
/** The function to be called after each {@link WorkRequest} is read. */
84
109
private final BiFunction <List <String >, PrintWriter , Integer > callback ;
@@ -88,6 +113,7 @@ synchronized Optional<WorkResponse.Builder> takeBuilder() {
88
113
89
114
final WorkerMessageProcessor messageProcessor ;
90
115
116
+ private final BiConsumer <Integer , Thread > cancelCallback ;
91
117
92
118
private final CpuTimeBasedGcScheduler gcScheduler ;
93
119
@@ -107,7 +133,7 @@ public WorkRequestHandler(
107
133
BiFunction <List <String >, PrintWriter , Integer > callback ,
108
134
PrintStream stderr ,
109
135
WorkerMessageProcessor messageProcessor ) {
110
- this (callback , stderr , messageProcessor , Duration .ZERO );
136
+ this (callback , stderr , messageProcessor , Duration .ZERO , null );
111
137
}
112
138
113
139
/**
@@ -131,10 +157,24 @@ public WorkRequestHandler(
131
157
PrintStream stderr ,
132
158
WorkerMessageProcessor messageProcessor ,
133
159
Duration cpuUsageBeforeGc ) {
160
+ this (callback , stderr , messageProcessor , cpuUsageBeforeGc , null );
161
+ }
162
+
163
+ /**
164
+ * Creates a {@code WorkRequestHandler} that will call {@code callback} for each WorkRequest
165
+ * received. Only used for the Builder.
166
+ */
167
+ private WorkRequestHandler (
168
+ BiFunction <List <String >, PrintWriter , Integer > callback ,
169
+ PrintStream stderr ,
170
+ WorkerMessageProcessor messageProcessor ,
171
+ Duration cpuUsageBeforeGc ,
172
+ BiConsumer <Integer , Thread > cancelCallback ) {
134
173
this .callback = callback ;
135
174
this .stderr = stderr ;
136
175
this .messageProcessor = messageProcessor ;
137
176
this .gcScheduler = new CpuTimeBasedGcScheduler (cpuUsageBeforeGc );
177
+ this .cancelCallback = cancelCallback ;
138
178
}
139
179
140
180
/** Builder class for WorkRequestHandler. Required parameters are passed to the constructor. */
@@ -143,6 +183,7 @@ public static class WorkRequestHandlerBuilder {
143
183
private final PrintStream stderr ;
144
184
private final WorkerMessageProcessor messageProcessor ;
145
185
private Duration cpuUsageBeforeGc = Duration .ZERO ;
186
+ private BiConsumer <Integer , Thread > cancelCallback ;
146
187
147
188
/**
148
189
* Creates a {@code WorkRequestHandlerBuilder}.
@@ -173,9 +214,19 @@ public WorkRequestHandlerBuilder setCpuUsageBeforeGc(Duration cpuUsageBeforeGc)
173
214
return this ;
174
215
}
175
216
217
+ /**
218
+ * Sets a callback will be called when a cancellation message has been received. The callback
219
+ * will be call with the request ID and the thread executing the request.
220
+ */
221
+ public WorkRequestHandlerBuilder setCancelCallback (BiConsumer <Integer , Thread > cancelCallback ) {
222
+ this .cancelCallback = cancelCallback ;
223
+ return this ;
224
+ }
225
+
176
226
/** Returns a WorkRequestHandler instance with the values in this Builder. */
177
227
public WorkRequestHandler build () {
178
- return new WorkRequestHandler (callback , stderr , messageProcessor , cpuUsageBeforeGc );
228
+ return new WorkRequestHandler (
229
+ callback , stderr , messageProcessor , cpuUsageBeforeGc , cancelCallback );
179
230
}
180
231
}
181
232
@@ -191,56 +242,42 @@ public void processRequests() throws IOException {
191
242
if (request == null ) {
192
243
break ;
193
244
}
194
- availableRequests .add (request );
195
- startRequestThreads ();
196
- }
197
- }
198
-
199
- /**
200
- * Starts threads for as many outstanding requests as possible. This is the only method that adds
201
- * to {@code activeRequests}.
202
- */
203
- private synchronized void startRequestThreads () {
204
- while (!availableRequests .isEmpty ()) {
205
- // If there's a singleplex request in process, don't start more processes.
206
- if (activeRequests .containsKey (0 )) {
207
- return ;
245
+ if (request .getCancel ()) {
246
+ respondToCancelRequest (request );
247
+ } else {
248
+ startResponseThread (request );
208
249
}
209
- WorkRequest request = availableRequests .peek ();
210
- // Don't start new singleplex requests if there are other requests running.
211
- if (request .getRequestId () == 0 && !activeRequests .isEmpty ()) {
212
- return ;
213
- }
214
- availableRequests .remove ();
215
- Thread t = createResponseThread (request );
216
- activeRequests .put (request .getRequestId (), new RequestInfo ());
217
- t .start ();
218
250
}
219
251
}
220
252
221
- /** Creates a new {@link Thread} to process a multiplex request. */
222
- Thread createResponseThread (WorkRequest request ) {
253
+ /** Starts a thread for the given request. */
254
+ void startResponseThread (WorkRequest request ) {
223
255
Thread currentThread = Thread .currentThread ();
224
256
String threadName =
225
257
request .getRequestId () > 0
226
258
? "multiplex-request-" + request .getRequestId ()
227
259
: "singleplex-request" ;
228
- return new Thread (
229
- () -> {
230
- RequestInfo requestInfo = activeRequests .get (request .getRequestId ());
231
- try {
232
- respondToRequest (request , requestInfo );
233
- } catch (IOException e ) {
234
- e .printStackTrace (stderr );
235
- // In case of error, shut down the entire worker.
236
- currentThread .interrupt ();
237
- } finally {
238
- activeRequests .remove (request .getRequestId ());
239
- // A good time to start more requests, especially if we finished a singleplex request
240
- startRequestThreads ();
241
- }
242
- },
243
- threadName );
260
+ Thread t =
261
+ new Thread (
262
+ () -> {
263
+ RequestInfo requestInfo = activeRequests .get (request .getRequestId ());
264
+ if (requestInfo == null ) {
265
+ // Already cancelled
266
+ return ;
267
+ }
268
+ try {
269
+ respondToRequest (request , requestInfo );
270
+ } catch (IOException e ) {
271
+ e .printStackTrace (stderr );
272
+ // In case of error, shut down the entire worker.
273
+ currentThread .interrupt ();
274
+ } finally {
275
+ activeRequests .remove (request .getRequestId ());
276
+ }
277
+ },
278
+ threadName );
279
+ activeRequests .put (request .getRequestId (), new RequestInfo (t ));
280
+ t .start ();
244
281
}
245
282
246
283
/** Handles and responds to the given {@link WorkRequest}. */
@@ -260,7 +297,11 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc
260
297
if (optBuilder .isPresent ()) {
261
298
WorkResponse .Builder builder = optBuilder .get ();
262
299
builder .setRequestId (request .getRequestId ());
263
- builder .setOutput (builder .getOutput () + sw .toString ()).setExitCode (exitCode );
300
+ if (requestInfo .isCancelled ()) {
301
+ builder .setWasCancelled (true );
302
+ } else {
303
+ builder .setOutput (builder .getOutput () + sw ).setExitCode (exitCode );
304
+ }
264
305
WorkResponse response = builder .build ();
265
306
synchronized (this ) {
266
307
messageProcessor .writeWorkResponse (response );
@@ -270,6 +311,45 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc
270
311
}
271
312
}
272
313
314
+ /**
315
+ * Handles cancelling an existing request, including sending a response if that is not done by the
316
+ * time {@code cancelCallback.accept} returns.
317
+ */
318
+ void respondToCancelRequest (WorkRequest request ) throws IOException {
319
+ // Theoretically, we could have gotten two singleplex requests, and we can't tell those apart.
320
+ // However, that's a violation of the protocol, so we don't try to handle it (not least because
321
+ // handling it would be quite error-prone).
322
+ RequestInfo ri = activeRequests .remove (request .getRequestId ());
323
+
324
+ if (ri == null ) {
325
+ return ;
326
+ }
327
+ if (cancelCallback == null ) {
328
+ ri .setCancelled ();
329
+ // This is either an error on the server side or a version mismatch between the server setup
330
+ // and the binary. It's better to wait for the regular work to finish instead of breaking the
331
+ // build, but we should inform the user about the bad setup.
332
+ ri .addOutput (
333
+ String .format (
334
+ "Cancellation request received for worker request %d, but this worker does not"
335
+ + " support cancellation.\n " ,
336
+ request .getRequestId ()));
337
+ } else {
338
+ if (ri .thread .isAlive () && !ri .isCancelled ()) {
339
+ ri .setCancelled ();
340
+ cancelCallback .accept (request .getRequestId (), ri .thread );
341
+ Optional <WorkResponse .Builder > builder = ri .takeBuilder ();
342
+ if (builder .isPresent ()) {
343
+ WorkResponse response =
344
+ builder .get ().setWasCancelled (true ).setRequestId (request .getRequestId ()).build ();
345
+ synchronized (this ) {
346
+ messageProcessor .writeWorkResponse (response );
347
+ }
348
+ }
349
+ }
350
+ }
351
+ }
352
+
273
353
@ Override
274
354
public void close () throws IOException {
275
355
messageProcessor .close ();
0 commit comments