Skip to content

Commit c771c43

Browse files
authored
Remote: Fix crashes by InterruptedException when dynamic execution is enabled. (#15091)
Fixes #14433. The root cause is, inside `RemoteExecutionCache`, the result of `FindMissingDigests` is shared with other threads without considering error handling. For example, if there are two or more threads uploading the same input and one thread got interrupted when waiting for the result of `FindMissingDigests` call, the call is cancelled and others threads still waiting for the upload will receive upload error due to the cancellation which is wrong. This PR fixes this by effectively applying reference count to the result of `FindMissingDigests` call so that if one thread got interrupted, as long as there are other threads depending on the result, the call won't be cancelled and the upload can continue. Closes #15001. PiperOrigin-RevId: 436180205
1 parent 48b60d2 commit c771c43

File tree

4 files changed

+270
-78
lines changed

4 files changed

+270
-78
lines changed

src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java

Lines changed: 149 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,39 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.remote;
1515

16-
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
16+
import static com.google.common.base.Preconditions.checkArgument;
17+
import static com.google.common.base.Preconditions.checkState;
18+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
19+
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
20+
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
21+
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
22+
import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
1723
import static java.lang.String.format;
1824

1925
import build.bazel.remote.execution.v2.Digest;
2026
import build.bazel.remote.execution.v2.Directory;
27+
import com.google.common.base.Throwables;
2128
import com.google.common.collect.ImmutableSet;
2229
import com.google.common.util.concurrent.Futures;
2330
import com.google.common.util.concurrent.ListenableFuture;
24-
import com.google.common.util.concurrent.MoreExecutors;
2531
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
2632
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
2733
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
2834
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
2935
import com.google.devtools.build.lib.remote.options.RemoteOptions;
3036
import com.google.devtools.build.lib.remote.util.DigestUtil;
31-
import com.google.devtools.build.lib.remote.util.RxFutures;
37+
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
3238
import com.google.protobuf.Message;
3339
import io.reactivex.rxjava3.core.Completable;
40+
import io.reactivex.rxjava3.core.Flowable;
41+
import io.reactivex.rxjava3.core.Single;
3442
import io.reactivex.rxjava3.subjects.AsyncSubject;
3543
import java.io.IOException;
36-
import java.util.ArrayList;
37-
import java.util.List;
44+
import java.util.HashSet;
3845
import java.util.Map;
39-
import java.util.concurrent.ConcurrentHashMap;
46+
import java.util.Set;
47+
import java.util.concurrent.atomic.AtomicBoolean;
48+
import javax.annotation.concurrent.GuardedBy;
4049

4150
/** A {@link RemoteCache} with additional functionality needed for remote execution. */
4251
public class RemoteExecutionCache extends RemoteCache {
@@ -72,62 +81,58 @@ public void ensureInputsPresent(
7281
.addAll(additionalInputs.keySet())
7382
.build();
7483

75-
// Collect digests that are not being or already uploaded
76-
ConcurrentHashMap<Digest, AsyncSubject<Boolean>> missingDigestSubjects =
77-
new ConcurrentHashMap<>();
78-
79-
List<ListenableFuture<Void>> uploadFutures = new ArrayList<>();
80-
for (Digest digest : allDigests) {
81-
Completable upload =
82-
casUploadCache.execute(
83-
digest,
84-
Completable.defer(
85-
() -> {
86-
// The digest hasn't been processed, add it to the collection which will be used
87-
// later for findMissingDigests call
88-
AsyncSubject<Boolean> missingDigestSubject = AsyncSubject.create();
89-
missingDigestSubjects.put(digest, missingDigestSubject);
90-
91-
return missingDigestSubject.flatMapCompletable(
92-
missing -> {
93-
if (!missing) {
94-
return Completable.complete();
95-
}
96-
return RxFutures.toCompletable(
97-
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
98-
MoreExecutors.directExecutor());
99-
});
100-
}),
101-
force);
102-
uploadFutures.add(RxFutures.toListenableFuture(upload));
84+
if (allDigests.isEmpty()) {
85+
return;
10386
}
10487

105-
ImmutableSet<Digest> missingDigests;
106-
try {
107-
missingDigests = getFromFuture(findMissingDigests(context, missingDigestSubjects.keySet()));
108-
} catch (IOException | InterruptedException e) {
109-
for (Map.Entry<Digest, AsyncSubject<Boolean>> entry : missingDigestSubjects.entrySet()) {
110-
entry.getValue().onError(e);
111-
}
88+
MissingDigestFinder missingDigestFinder = new MissingDigestFinder(context, allDigests.size());
89+
Flowable<TransferResult> uploads =
90+
Flowable.fromIterable(allDigests)
91+
.flatMapSingle(
92+
digest ->
93+
uploadBlobIfMissing(
94+
context, merkleTree, additionalInputs, force, missingDigestFinder, digest));
11295

113-
if (e instanceof InterruptedException) {
114-
Thread.currentThread().interrupt();
96+
try {
97+
mergeBulkTransfer(uploads).blockingAwait();
98+
} catch (RuntimeException e) {
99+
Throwable cause = e.getCause();
100+
if (cause != null) {
101+
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
102+
Throwables.throwIfInstanceOf(cause, IOException.class);
115103
}
116104
throw e;
117105
}
106+
}
118107

119-
for (Map.Entry<Digest, AsyncSubject<Boolean>> entry : missingDigestSubjects.entrySet()) {
120-
AsyncSubject<Boolean> missingSubject = entry.getValue();
121-
if (missingDigests.contains(entry.getKey())) {
122-
missingSubject.onNext(true);
123-
} else {
124-
// The digest is already existed in the remote cache, skip the upload.
125-
missingSubject.onNext(false);
126-
}
127-
missingSubject.onComplete();
128-
}
129-
130-
waitForBulkTransfer(uploadFutures, /* cancelRemainingOnInterrupt=*/ false);
108+
private Single<TransferResult> uploadBlobIfMissing(
109+
RemoteActionExecutionContext context,
110+
MerkleTree merkleTree,
111+
Map<Digest, Message> additionalInputs,
112+
boolean force,
113+
MissingDigestFinder missingDigestFinder,
114+
Digest digest) {
115+
Completable upload =
116+
casUploadCache.execute(
117+
digest,
118+
Completable.defer(
119+
() ->
120+
// Only reach here if the digest is missing and is not being uploaded.
121+
missingDigestFinder
122+
.registerAndCount(digest)
123+
.flatMapCompletable(
124+
missingDigests -> {
125+
if (missingDigests.contains(digest)) {
126+
return toCompletable(
127+
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
128+
directExecutor());
129+
} else {
130+
return Completable.complete();
131+
}
132+
})),
133+
/* onIgnored= */ missingDigestFinder::count,
134+
force);
135+
return toTransferResult(upload);
131136
}
132137

133138
private ListenableFuture<Void> uploadBlob(
@@ -159,4 +164,93 @@ private ListenableFuture<Void> uploadBlob(
159164
"findMissingDigests returned a missing digest that has not been requested: %s",
160165
digest)));
161166
}
167+
168+
/**
169+
* A missing digest finder that initiates the request when the internal counter reaches an
170+
* expected count.
171+
*/
172+
class MissingDigestFinder {
173+
private final int expectedCount;
174+
175+
private final AsyncSubject<ImmutableSet<Digest>> digestsSubject;
176+
private final Single<ImmutableSet<Digest>> resultSingle;
177+
178+
@GuardedBy("this")
179+
private final Set<Digest> digests;
180+
181+
@GuardedBy("this")
182+
private int currentCount = 0;
183+
184+
MissingDigestFinder(RemoteActionExecutionContext context, int expectedCount) {
185+
checkArgument(expectedCount > 0, "expectedCount should be greater than 0");
186+
this.expectedCount = expectedCount;
187+
this.digestsSubject = AsyncSubject.create();
188+
this.digests = new HashSet<>();
189+
190+
AtomicBoolean findMissingDigestsCalled = new AtomicBoolean(false);
191+
this.resultSingle =
192+
Single.fromObservable(
193+
digestsSubject
194+
.flatMapSingle(
195+
digests -> {
196+
boolean wasCalled = findMissingDigestsCalled.getAndSet(true);
197+
// Make sure we don't have re-subscription caused by refCount() below.
198+
checkState(!wasCalled, "FindMissingDigests is called more than once");
199+
return toSingle(
200+
() -> findMissingDigests(context, digests), directExecutor());
201+
})
202+
// Use replay here because we could have a race condition that downstream hasn't
203+
// been added to the subscription list (to receive the upstream result) while
204+
// upstream is completed.
205+
.replay(1)
206+
.refCount());
207+
}
208+
209+
/**
210+
* Register the {@code digest} and increase the counter.
211+
*
212+
* <p>Returned Single cannot be subscribed more than once.
213+
*
214+
* @return Single that emits the result of the {@code FindMissingDigest} request.
215+
*/
216+
Single<ImmutableSet<Digest>> registerAndCount(Digest digest) {
217+
AtomicBoolean subscribed = new AtomicBoolean(false);
218+
// count() will potentially trigger the findMissingDigests call. Adding and counting before
219+
// returning the Single could introduce a race that the result of findMissingDigests is
220+
// available but the consumer doesn't get it because it hasn't subscribed the returned
221+
// Single. In this case, it subscribes after upstream is completed resulting a re-run of
222+
// findMissingDigests (due to refCount()).
223+
//
224+
// Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the
225+
// returned Single to avoid a re-execution of findMissingDigests.
226+
return resultSingle.doOnSubscribe(
227+
d -> {
228+
boolean wasSubscribed = subscribed.getAndSet(true);
229+
checkState(!wasSubscribed, "Single is subscribed more than once");
230+
synchronized (this) {
231+
digests.add(digest);
232+
}
233+
count();
234+
});
235+
}
236+
237+
/** Increase the counter. */
238+
void count() {
239+
ImmutableSet<Digest> digestsResult = null;
240+
241+
synchronized (this) {
242+
if (currentCount < expectedCount) {
243+
currentCount++;
244+
if (currentCount == expectedCount) {
245+
digestsResult = ImmutableSet.copyOf(digests);
246+
}
247+
}
248+
}
249+
250+
if (digestsResult != null) {
251+
digestsSubject.onNext(digestsResult);
252+
digestsSubject.onComplete();
253+
}
254+
}
255+
}
162256
}

src/main/java/com/google/devtools/build/lib/remote/util/AsyncTaskCache.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.rxjava3.core.Single;
2525
import io.reactivex.rxjava3.core.SingleObserver;
2626
import io.reactivex.rxjava3.disposables.Disposable;
27+
import io.reactivex.rxjava3.functions.Action;
2728
import java.util.ArrayList;
2829
import java.util.HashMap;
2930
import java.util.List;
@@ -256,14 +257,25 @@ public boolean isDisposed() {
256257
/**
257258
* Executes a task.
258259
*
260+
* @see #execute(Object, Single, Action, boolean).
261+
*/
262+
public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
263+
return execute(key, task, () -> {}, force);
264+
}
265+
266+
/**
267+
* Executes a task. If the task has already finished, this execution of the task is ignored unless
268+
* `force` is true. If the task is in progress this execution of the task is always ignored.
269+
*
259270
* <p>If the cache is already shutdown, a {@link CancellationException} will be emitted.
260271
*
261272
* @param key identifies the task.
273+
* @param onIgnored callback called when provided task is ignored.
262274
* @param force re-execute a finished task if set to {@code true}.
263275
* @return a {@link Single} which turns to completed once the task is finished or propagates the
264276
* error if any.
265277
*/
266-
public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
278+
public Single<ValueT> execute(KeyT key, Single<ValueT> task, Action onIgnored, boolean force) {
267279
return Single.create(
268280
emitter -> {
269281
synchronized (lock) {
@@ -273,14 +285,20 @@ public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
273285
}
274286

275287
if (!force && finished.containsKey(key)) {
288+
onIgnored.run();
276289
emitter.onSuccess(finished.get(key));
277290
return;
278291
}
279292

280293
finished.remove(key);
281294

282-
Execution execution =
283-
inProgress.computeIfAbsent(key, ignoredKey -> new Execution(key, task));
295+
Execution execution = inProgress.get(key);
296+
if (execution != null) {
297+
onIgnored.run();
298+
} else {
299+
execution = new Execution(key, task);
300+
inProgress.put(key, execution);
301+
}
284302

285303
// We must subscribe the execution within the scope of lock to avoid race condition
286304
// that:
@@ -425,10 +443,15 @@ public Completable executeIfNot(KeyT key, Completable task) {
425443
cache.executeIfNot(key, task.toSingleDefault(Optional.empty())));
426444
}
427445

428-
/** Same as {@link AsyncTaskCache#executeIfNot} but operates on {@link Completable}. */
446+
/** Same as {@link AsyncTaskCache#execute} but operates on {@link Completable}. */
429447
public Completable execute(KeyT key, Completable task, boolean force) {
448+
return execute(key, task, () -> {}, force);
449+
}
450+
451+
/** Same as {@link AsyncTaskCache#execute} but operates on {@link Completable}. */
452+
public Completable execute(KeyT key, Completable task, Action onIgnored, boolean force) {
430453
return Completable.fromSingle(
431-
cache.execute(key, task.toSingleDefault(Optional.empty()), force));
454+
cache.execute(key, task.toSingleDefault(Optional.empty()), onIgnored, force));
432455
}
433456

434457
/** Returns a set of keys for tasks which is finished. */

0 commit comments

Comments
 (0)