Skip to content

Commit 14895f0

Browse files
authored
Wait for outputs downloads before emitting local BEP events that reference these outputs. (#18815)
This is a workaround for 6.3.0. At HEAD, we download these outputs during action execution so there is no need to wait when emitting events. Fixes #18333
1 parent 3a8c076 commit 14895f0

13 files changed

+184
-22
lines changed

src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
3939
import com.google.devtools.build.lib.buildeventstream.AnnounceBuildEventTransportsEvent;
4040
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
41+
import com.google.devtools.build.lib.buildeventstream.BuildEventLocalFileSynchronizer;
4142
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
4243
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Aborted.AbortReason;
4344
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
@@ -90,6 +91,7 @@
9091
import java.util.concurrent.ScheduledFuture;
9192
import java.util.concurrent.TimeUnit;
9293
import java.util.concurrent.TimeoutException;
94+
import java.util.stream.Collectors;
9395
import javax.annotation.Nullable;
9496

9597
/**
@@ -753,6 +755,19 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
753755
CountingArtifactGroupNamer artifactGroupNamer)
754756
throws IOException {
755757
ImmutableSet.Builder<BuildEventTransport> bepTransportsBuilder = new ImmutableSet.Builder<>();
758+
BuildEventLocalFileSynchronizer synchronizer =
759+
localFiles -> {
760+
var outputService = cmdEnv.getOutputService();
761+
if (outputService == null || localFiles.isEmpty()) {
762+
return Futures.immediateVoidFuture();
763+
}
764+
765+
var files =
766+
localFiles.stream()
767+
.map(localFile -> localFile.path.asFragment())
768+
.collect(Collectors.toList());
769+
return outputService.waitOutputDownloads(files);
770+
};
756771

757772
if (!Strings.isNullOrEmpty(besStreamOptions.buildEventTextFile)) {
758773
try {
@@ -766,7 +781,11 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
766781
: new LocalFilesArtifactUploader();
767782
bepTransportsBuilder.add(
768783
new TextFormatFileTransport(
769-
bepTextOutputStream, bepOptions, localFileUploader, artifactGroupNamer));
784+
bepTextOutputStream,
785+
bepOptions,
786+
localFileUploader,
787+
artifactGroupNamer,
788+
synchronizer));
770789
} catch (IOException exception) {
771790
// TODO(b/125216340): Consider making this a warning instead of an error once the
772791
// associated bug has been resolved.
@@ -793,7 +812,11 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
793812
: new LocalFilesArtifactUploader();
794813
bepTransportsBuilder.add(
795814
new BinaryFormatFileTransport(
796-
bepBinaryOutputStream, bepOptions, localFileUploader, artifactGroupNamer));
815+
bepBinaryOutputStream,
816+
bepOptions,
817+
localFileUploader,
818+
artifactGroupNamer,
819+
synchronizer));
797820
} catch (IOException exception) {
798821
// TODO(b/125216340): Consider making this a warning instead of an error once the
799822
// associated bug has been resolved.
@@ -819,7 +842,11 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
819842
: new LocalFilesArtifactUploader();
820843
bepTransportsBuilder.add(
821844
new JsonFormatFileTransport(
822-
bepJsonOutputStream, bepOptions, localFileUploader, artifactGroupNamer));
845+
bepJsonOutputStream,
846+
bepOptions,
847+
localFileUploader,
848+
artifactGroupNamer,
849+
synchronizer));
823850
} catch (IOException exception) {
824851
// TODO(b/125216340): Consider making this a warning instead of an error once the
825852
// associated bug has been resolved.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2023 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package com.google.devtools.build.lib.buildeventstream;
15+
16+
import com.google.common.util.concurrent.Futures;
17+
import com.google.common.util.concurrent.ListenableFuture;
18+
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
19+
import java.util.Collection;
20+
21+
/**
22+
* An interface that is used to wait for downloads of remote outputs before sending out local BEP
23+
* events that reference these outputs.
24+
*/
25+
public interface BuildEventLocalFileSynchronizer {
26+
BuildEventLocalFileSynchronizer NO_OP = localFiles -> Futures.immediateVoidFuture();
27+
28+
ListenableFuture<Void> waitForLocalFileDownloads(Collection<LocalFile> localFiles);
29+
}

src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
1818
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
1919
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
20+
import com.google.devtools.build.lib.buildeventstream.BuildEventLocalFileSynchronizer;
2021
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
2122
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
2223
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
@@ -34,8 +35,9 @@ public BinaryFormatFileTransport(
3435
BufferedOutputStream outputStream,
3536
BuildEventProtocolOptions options,
3637
BuildEventArtifactUploader uploader,
37-
ArtifactGroupNamer namer) {
38-
super(outputStream, options, uploader, namer);
38+
ArtifactGroupNamer namer,
39+
BuildEventLocalFileSynchronizer synchronizer) {
40+
super(outputStream, options, uploader, namer, synchronizer);
3941
}
4042

4143
@Override

src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
2929
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
3030
import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
31+
import com.google.devtools.build.lib.buildeventstream.BuildEventLocalFileSynchronizer;
3132
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
3233
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
3334
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
@@ -66,6 +67,7 @@ abstract class FileTransport implements BuildEventTransport {
6667
private final BuildEventArtifactUploader uploader;
6768
private final SequentialWriter writer;
6869
private final ArtifactGroupNamer namer;
70+
private final BuildEventLocalFileSynchronizer synchronizer;
6971

7072
private final ScheduledExecutorService timeoutExecutor =
7173
MoreExecutors.listeningDecorator(
@@ -76,12 +78,14 @@ abstract class FileTransport implements BuildEventTransport {
7678
BufferedOutputStream outputStream,
7779
BuildEventProtocolOptions options,
7880
BuildEventArtifactUploader uploader,
79-
ArtifactGroupNamer namer) {
81+
ArtifactGroupNamer namer,
82+
BuildEventLocalFileSynchronizer synchronizer) {
8083
this.uploader = uploader;
8184
this.options = options;
8285
this.writer =
8386
new SequentialWriter(outputStream, this::serializeEvent, uploader, timeoutExecutor);
8487
this.namer = namer;
88+
this.synchronizer = synchronizer;
8589
}
8690

8791
@ThreadSafe
@@ -280,12 +284,14 @@ private ListenableFuture<BuildEventStreamProtos.BuildEvent> asStreamProto(
280284
BuildEvent event, ArtifactGroupNamer namer) {
281285
checkNotNull(event);
282286

287+
var localFiles = event.referencedLocalFiles();
288+
ListenableFuture<?> localFileDownloads = synchronizer.waitForLocalFileDownloads(localFiles);
283289
ListenableFuture<PathConverter> converterFuture =
284-
uploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
290+
uploader.uploadReferencedLocalFiles(localFiles);
285291
ListenableFuture<?> remoteUploads =
286292
uploader.waitForRemoteUploads(event.remoteUploads(), timeoutExecutor);
287293
return Futures.transform(
288-
Futures.allAsList(converterFuture, remoteUploads),
294+
Futures.allAsList(localFileDownloads, converterFuture, remoteUploads),
289295
results -> {
290296
BuildEventContext context =
291297
new BuildEventContext() {

src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
2020
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
21+
import com.google.devtools.build.lib.buildeventstream.BuildEventLocalFileSynchronizer;
2122
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
2223
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
2324
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
@@ -34,8 +35,9 @@ public JsonFormatFileTransport(
3435
BufferedOutputStream outputStream,
3536
BuildEventProtocolOptions options,
3637
BuildEventArtifactUploader uploader,
37-
ArtifactGroupNamer namer) {
38-
super(outputStream, options, uploader, namer);
38+
ArtifactGroupNamer namer,
39+
BuildEventLocalFileSynchronizer synchronizer) {
40+
super(outputStream, options, uploader, namer, synchronizer);
3941
}
4042

4143
@Override

src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
2020
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
21+
import com.google.devtools.build.lib.buildeventstream.BuildEventLocalFileSynchronizer;
2122
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
2223
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
2324
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
@@ -35,8 +36,9 @@ public TextFormatFileTransport(
3536
BufferedOutputStream outputStream,
3637
BuildEventProtocolOptions options,
3738
BuildEventArtifactUploader uploader,
38-
ArtifactGroupNamer namer) {
39-
super(outputStream, options, uploader, namer);
39+
ArtifactGroupNamer namer,
40+
BuildEventLocalFileSynchronizer synchronizer) {
41+
super(outputStream, options, uploader, namer, synchronizer);
4042
}
4143

4244
@Override

src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.google.devtools.build.lib.events.Reporter;
4949
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
5050
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
51+
import com.google.devtools.build.lib.remote.util.RxFutures;
5152
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
5253
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
5354
import com.google.devtools.build.lib.vfs.FileSystemUtils;
@@ -60,13 +61,15 @@
6061
import java.util.ArrayDeque;
6162
import java.util.ArrayList;
6263
import java.util.Arrays;
64+
import java.util.Collection;
6365
import java.util.Deque;
6466
import java.util.List;
6567
import java.util.Set;
6668
import java.util.concurrent.ConcurrentHashMap;
6769
import java.util.concurrent.atomic.AtomicBoolean;
6870
import java.util.concurrent.atomic.AtomicReference;
6971
import java.util.regex.Pattern;
72+
import java.util.stream.Collectors;
7073
import javax.annotation.Nullable;
7174

7275
/**
@@ -760,6 +763,12 @@ public void flushOutputTree() throws InterruptedException {
760763
downloadCache.awaitInProgressTasks();
761764
}
762765

766+
public ListenableFuture<Void> waitDownloads(Collection<PathFragment> files) {
767+
var convertedFiles = files.stream().map(file -> execRoot.getFileSystem().getPath(file)).collect(
768+
Collectors.toList());
769+
return RxFutures.toListenableFuture(downloadCache.waitInProgressTasks(convertedFiles));
770+
}
771+
763772
public ImmutableSet<ActionInput> getMissingActionInputs() {
764773
return ImmutableSet.copyOf(missingActionInputs);
765774
}

src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.google.common.collect.ImmutableList;
2020
import com.google.common.collect.ImmutableMap;
2121
import com.google.common.eventbus.Subscribe;
22+
import com.google.common.util.concurrent.Futures;
23+
import com.google.common.util.concurrent.ListenableFuture;
2224
import com.google.devtools.build.lib.actions.Action;
2325
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
2426
import com.google.devtools.build.lib.actions.ActionInputMap;
@@ -34,10 +36,12 @@
3436
import com.google.devtools.build.lib.vfs.FileSystem;
3537
import com.google.devtools.build.lib.vfs.ModifiedFileSet;
3638
import com.google.devtools.build.lib.vfs.OutputService;
39+
import com.google.devtools.build.lib.vfs.Path;
3740
import com.google.devtools.build.lib.vfs.PathFragment;
3841
import com.google.devtools.build.lib.vfs.Root;
3942
import com.google.devtools.build.skyframe.SkyFunction.Environment;
4043
import java.io.IOException;
44+
import java.util.Collection;
4145
import java.util.Map;
4246
import java.util.UUID;
4347
import javax.annotation.Nullable;
@@ -111,6 +115,13 @@ public void flushOutputTree() throws InterruptedException {
111115
}
112116
}
113117

118+
public ListenableFuture<Void> waitOutputDownloads(Collection<PathFragment> files) {
119+
if (actionInputFetcher != null) {
120+
return actionInputFetcher.waitDownloads(files);
121+
}
122+
return Futures.immediateVoidFuture();
123+
}
124+
114125
@Override
115126
public void finalizeBuild(boolean buildSuccessful) {
116127
// Intentionally left empty.

src/main/java/com/google/devtools/build/lib/remote/util/AsyncTaskCache.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.reactivex.rxjava3.functions.Action;
2929
import io.reactivex.rxjava3.subjects.AsyncSubject;
3030
import java.util.ArrayList;
31+
import java.util.Collection;
3132
import java.util.HashMap;
3233
import java.util.List;
3334
import java.util.Map;
@@ -357,6 +358,32 @@ public void shutdown() {
357358
}
358359
}
359360

361+
/**
362+
* Returns a {@link Completable} which will completes once the in-progress tasks identified by
363+
* {@code keys} are completed. Tasks submitted after the call are not waited.
364+
*/
365+
public Completable waitInProgressTasks(Collection<KeyT> keys) {
366+
return Completable.defer(
367+
() -> {
368+
List<Execution> executions = new ArrayList<>();
369+
synchronized (lock) {
370+
for (var key : keys) {
371+
var execution = inProgress.get(key);
372+
if (execution != null) {
373+
executions.add(execution);
374+
}
375+
}
376+
}
377+
if (executions.isEmpty()) {
378+
return Completable.complete();
379+
}
380+
381+
return Completable.fromPublisher(
382+
Flowable.fromIterable(executions)
383+
.flatMapSingle(e -> Single.fromObservable(e.completion)));
384+
});
385+
}
386+
360387
/**
361388
* Waits for the in-progress tasks to finish. Any tasks that are submitted after the call are not
362389
* waited.
@@ -535,6 +562,14 @@ public void shutdown() {
535562
cache.shutdown();
536563
}
537564

565+
/**
566+
* Returns a {@link Completable} which will completes once the in-progress tasks identified by
567+
* {@code keys} are completed. Tasks submitted after the call are not waited.
568+
*/
569+
public Completable waitInProgressTasks(Collection<KeyT> keys) {
570+
return cache.waitInProgressTasks(keys);
571+
}
572+
538573
/**
539574
* Waits for the in-progress tasks to finish. Any tasks that are submitted after the call are
540575
* not waited.

src/main/java/com/google/devtools/build/lib/vfs/OutputService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import com.google.common.collect.ImmutableCollection;
1818
import com.google.common.collect.ImmutableList;
1919
import com.google.common.collect.ImmutableMap;
20+
import com.google.common.util.concurrent.Futures;
21+
import com.google.common.util.concurrent.ListenableFuture;
2022
import com.google.devtools.build.lib.actions.Action;
2123
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
2224
import com.google.devtools.build.lib.actions.ActionInputMap;
@@ -33,6 +35,7 @@
3335
import com.google.devtools.build.lib.util.AbruptExitException;
3436
import com.google.devtools.build.skyframe.SkyFunction.Environment;
3537
import java.io.IOException;
38+
import java.util.Collection;
3639
import java.util.Map;
3740
import java.util.UUID;
3841
import javax.annotation.Nullable;
@@ -121,6 +124,10 @@ ModifiedFileSet startBuild(EventHandler eventHandler, UUID buildId, boolean fina
121124
/** Flush and wait for in-progress downloads. */
122125
default void flushOutputTree() throws InterruptedException {}
123126

127+
default ListenableFuture<Void> waitOutputDownloads(Collection<PathFragment> files) {
128+
return Futures.immediateVoidFuture();
129+
}
130+
124131
/**
125132
* Finish the build.
126133
*

0 commit comments

Comments
 (0)