Skip to content

Commit 71e35b1

Browse files
Googlerphilwo
Googler
authored andcommitted
Remote: Use parameters instead of thread-local storage to provide tracing metadata. (Part 5)
Change MissingDigestsFinder#findMissingDigests and RemoteExecutionClient#executeRemotely to use RemoteActionExecutionContext. Removed all the usages of io.grpc.Context in the client code. Fixed the regression about NetworkTime introduced by bc54c64. PiperOrigin-RevId: 354479787
1 parent c378d9d commit 71e35b1

37 files changed

+660
-775
lines changed

src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,10 @@
3030
import com.google.devtools.build.lib.buildeventstream.PathConverter;
3131
import com.google.devtools.build.lib.collect.ImmutableIterable;
3232
import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
33-
import com.google.devtools.build.lib.remote.common.NetworkTime;
3433
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
35-
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
3634
import com.google.devtools.build.lib.remote.util.DigestUtil;
3735
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
3836
import com.google.devtools.build.lib.vfs.Path;
39-
import io.grpc.Context;
4037
import io.netty.util.AbstractReferenceCounted;
4138
import io.netty.util.ReferenceCounted;
4239
import java.io.IOException;
@@ -161,7 +158,9 @@ private static List<PathMetadata> processQueryResult(
161158
*/
162159
private ListenableFuture<ImmutableIterable<PathMetadata>> queryRemoteCache(
163160
ImmutableList<ListenableFuture<PathMetadata>> allPaths) throws Exception {
164-
Context ctx = TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, "bes-upload");
161+
RequestMetadata metadata =
162+
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload");
163+
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
165164

166165
List<PathMetadata> knownRemotePaths = new ArrayList<>(allPaths.size());
167166
List<PathMetadata> filesToQuery = new ArrayList<>();
@@ -181,7 +180,7 @@ private ListenableFuture<ImmutableIterable<PathMetadata>> queryRemoteCache(
181180
return Futures.immediateFuture(ImmutableIterable.from(knownRemotePaths));
182181
}
183182
return Futures.transform(
184-
ctx.call(() -> missingDigestsFinder.findMissingDigests(digestsToQuery)),
183+
missingDigestsFinder.findMissingDigests(context, digestsToQuery),
185184
(missingDigests) -> {
186185
List<PathMetadata> filesToQueryUpdated = processQueryResult(missingDigests, filesToQuery);
187186
return ImmutableIterable.from(Iterables.concat(knownRemotePaths, filesToQueryUpdated));
@@ -197,8 +196,7 @@ private ListenableFuture<List<PathMetadata>> uploadLocalFiles(
197196
ImmutableIterable<PathMetadata> allPaths) {
198197
RequestMetadata metadata =
199198
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload");
200-
RemoteActionExecutionContext context =
201-
new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
199+
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
202200

203201
ImmutableList.Builder<ListenableFuture<PathMetadata>> allPathsUploaded =
204202
ImmutableList.builder();

src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ void shutdown() {
208208
* boolean)} instead.
209209
*/
210210
@Deprecated
211-
@VisibleForTesting
212211
public ListenableFuture<Void> uploadBlobAsync(
213212
RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload) {
214213
Digest digest =

src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import build.bazel.remote.execution.v2.ExecuteResponse;
2020
import build.bazel.remote.execution.v2.ExecutionGrpc;
2121
import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionBlockingStub;
22+
import build.bazel.remote.execution.v2.RequestMetadata;
2223
import build.bazel.remote.execution.v2.WaitExecutionRequest;
2324
import com.google.common.base.Preconditions;
2425
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
2526
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
2627
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
2728
import com.google.devtools.build.lib.remote.Retrier.Backoff;
2829
import com.google.devtools.build.lib.remote.common.OperationObserver;
30+
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
2931
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
3032
import com.google.devtools.build.lib.remote.options.RemoteOptions;
3133
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -71,9 +73,9 @@ public ExperimentalGrpcRemoteExecutor(
7173
this.retrier = retrier;
7274
}
7375

74-
private ExecutionBlockingStub executionBlockingStub() {
76+
private ExecutionBlockingStub executionBlockingStub(RequestMetadata metadata) {
7577
return ExecutionGrpc.newBlockingStub(channel)
76-
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
78+
.withInterceptors(TracingMetadataUtils.attachMetadataInterceptor(metadata))
7779
.withCallCredentials(callCredentialsProvider.getCallCredentials())
7880
.withDeadlineAfter(remoteOptions.remoteTimeout.getSeconds(), SECONDS);
7981
}
@@ -310,11 +312,16 @@ static ExecuteResponse extractResponseOrThrowIfError(Operation operation) throws
310312
}
311313

312314
@Override
313-
public ExecuteResponse executeRemotely(ExecuteRequest request, OperationObserver observer)
315+
public ExecuteResponse executeRemotely(
316+
RemoteActionExecutionContext context, ExecuteRequest request, OperationObserver observer)
314317
throws IOException, InterruptedException {
315318
Execution execution =
316319
new Execution(
317-
request, observer, retrier, callCredentialsProvider, this::executionBlockingStub);
320+
request,
321+
observer,
322+
retrier,
323+
callCredentialsProvider,
324+
() -> this.executionBlockingStub(context.getRequestMetadata()));
318325
return execution.start();
319326
}
320327

src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import com.google.devtools.build.lib.remote.util.Utils;
5757
import com.google.devtools.build.lib.vfs.Path;
5858
import com.google.protobuf.ByteString;
59-
import io.grpc.Context;
6059
import io.grpc.Status;
6160
import io.grpc.Status.Code;
6261
import io.grpc.StatusRuntimeException;
@@ -122,9 +121,11 @@ private int computeMaxMissingBlobsDigestsPerMessage() {
122121
return (options.maxOutboundMessageSize - overhead) / digestSize;
123122
}
124123

125-
private ContentAddressableStorageFutureStub casFutureStub() {
124+
private ContentAddressableStorageFutureStub casFutureStub(RemoteActionExecutionContext context) {
126125
return ContentAddressableStorageGrpc.newFutureStub(channel)
127-
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
126+
.withInterceptors(
127+
TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()),
128+
new NetworkTimeInterceptor(context::getNetworkTime))
128129
.withCallCredentials(callCredentialsProvider.getCallCredentials())
129130
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
130131
}
@@ -176,7 +177,8 @@ public static boolean isRemoteCacheOptions(RemoteOptions options) {
176177
}
177178

178179
@Override
179-
public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
180+
public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
181+
RemoteActionExecutionContext context, Iterable<Digest> digests) {
180182
if (Iterables.isEmpty(digests)) {
181183
return Futures.immediateFuture(ImmutableSet.of());
182184
}
@@ -187,13 +189,13 @@ public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest
187189
for (Digest digest : digests) {
188190
requestBuilder.addBlobDigests(digest);
189191
if (requestBuilder.getBlobDigestsCount() == maxMissingBlobsDigestsPerMessage) {
190-
getMissingDigestCalls.add(getMissingDigests(requestBuilder.build()));
192+
getMissingDigestCalls.add(getMissingDigests(context, requestBuilder.build()));
191193
requestBuilder.clearBlobDigests();
192194
}
193195
}
194196

195197
if (requestBuilder.getBlobDigestsCount() > 0) {
196-
getMissingDigestCalls.add(getMissingDigests(requestBuilder.build()));
198+
getMissingDigestCalls.add(getMissingDigests(context, requestBuilder.build()));
197199
}
198200

199201
ListenableFuture<ImmutableSet<Digest>> success =
@@ -209,7 +211,7 @@ public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest
209211
},
210212
MoreExecutors.directExecutor());
211213

212-
RequestMetadata requestMetadata = TracingMetadataUtils.fromCurrentContext();
214+
RequestMetadata requestMetadata = context.getRequestMetadata();
213215
return Futures.catchingAsync(
214216
success,
215217
RuntimeException.class,
@@ -226,10 +228,9 @@ public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest
226228
}
227229

228230
private ListenableFuture<FindMissingBlobsResponse> getMissingDigests(
229-
FindMissingBlobsRequest request) {
230-
Context ctx = Context.current();
231+
RemoteActionExecutionContext context, FindMissingBlobsRequest request) {
231232
return Utils.refreshIfUnauthenticatedAsync(
232-
() -> retrier.executeAsync(() -> ctx.call(() -> casFutureStub().findMissingBlobs(request))),
233+
() -> retrier.executeAsync(() -> casFutureStub(context).findMissingBlobs(request)),
233234
callCredentialsProvider);
234235
}
235236

src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import build.bazel.remote.execution.v2.ExecuteResponse;
1919
import build.bazel.remote.execution.v2.ExecutionGrpc;
2020
import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionBlockingStub;
21+
import build.bazel.remote.execution.v2.RequestMetadata;
2122
import build.bazel.remote.execution.v2.WaitExecutionRequest;
2223
import com.google.common.base.Preconditions;
2324
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
2425
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
2526
import com.google.devtools.build.lib.remote.common.OperationObserver;
27+
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
2628
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
2729
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
2830
import com.google.devtools.build.lib.remote.util.Utils;
@@ -55,9 +57,9 @@ public GrpcRemoteExecutor(
5557
this.retrier = retrier;
5658
}
5759

58-
private ExecutionBlockingStub execBlockingStub() {
60+
private ExecutionBlockingStub execBlockingStub(RequestMetadata metadata) {
5961
return ExecutionGrpc.newBlockingStub(channel)
60-
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
62+
.withInterceptors(TracingMetadataUtils.attachMetadataInterceptor(metadata))
6163
.withCallCredentials(callCredentialsProvider.getCallCredentials());
6264
}
6365

@@ -105,7 +107,8 @@ private ExecuteResponse getOperationResponse(Operation op) throws IOException {
105107
* trigger a retry of the Execute call, resulting in a new Operation.
106108
* */
107109
@Override
108-
public ExecuteResponse executeRemotely(ExecuteRequest request, OperationObserver observer)
110+
public ExecuteResponse executeRemotely(
111+
RemoteActionExecutionContext context, ExecuteRequest request, OperationObserver observer)
109112
throws IOException, InterruptedException {
110113
// Execute has two components: the Execute call and (optionally) the WaitExecution call.
111114
// This is the simple flow without any errors:
@@ -149,9 +152,9 @@ public ExecuteResponse executeRemotely(ExecuteRequest request, OperationObserver
149152
WaitExecutionRequest.newBuilder()
150153
.setName(operation.get().getName())
151154
.build();
152-
replies = execBlockingStub().waitExecution(wr);
155+
replies = execBlockingStub(context.getRequestMetadata()).waitExecution(wr);
153156
} else {
154-
replies = execBlockingStub().execute(request);
157+
replies = execBlockingStub(context.getRequestMetadata()).execute(request);
155158
}
156159
try {
157160
while (replies.hasNext()) {

src/main/java/com/google/devtools/build/lib/remote/NetworkTimeInterceptor.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.grpc.Channel;
2020
import io.grpc.ClientCall;
2121
import io.grpc.ClientInterceptor;
22-
import io.grpc.Context;
2322
import io.grpc.ForwardingClientCall;
2423
import io.grpc.ForwardingClientCallListener;
2524
import io.grpc.Metadata;
@@ -30,7 +29,6 @@
3029
/** The ClientInterceptor used to track network time. */
3130
public class NetworkTimeInterceptor implements ClientInterceptor {
3231

33-
public static final Context.Key<NetworkTime> CONTEXT_KEY = Context.key("remote-network-time");
3432
private final Supplier<NetworkTime> networkTimeSupplier;
3533

3634
public NetworkTimeInterceptor(Supplier<NetworkTime> networkTimeSupplier) {

src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,12 @@
3333
import com.google.devtools.build.lib.profiler.ProfilerTask;
3434
import com.google.devtools.build.lib.profiler.SilentCloseable;
3535
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
36-
import com.google.devtools.build.lib.remote.common.NetworkTime;
3736
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
38-
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
3937
import com.google.devtools.build.lib.remote.util.DigestUtil;
4038
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
4139
import com.google.devtools.build.lib.remote.util.Utils;
4240
import com.google.devtools.build.lib.sandbox.SandboxHelpers;
4341
import com.google.devtools.build.lib.vfs.Path;
44-
import io.grpc.Context;
4542
import java.io.IOException;
4643
import java.util.HashMap;
4744
import java.util.HashSet;
@@ -167,48 +164,42 @@ private ListenableFuture<Void> downloadFileAsync(Path path, FileArtifactValue me
167164
if (download == null) {
168165
RequestMetadata requestMetadata =
169166
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, metadata.getActionId());
170-
RemoteActionExecutionContext remoteActionExecutionContext =
171-
new RemoteActionExecutionContextImpl(requestMetadata, new NetworkTime());
172-
Context ctx = TracingMetadataUtils.contextWithMetadata(requestMetadata);
173-
Context prevCtx = ctx.attach();
174-
try {
175-
Digest digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
176-
download = remoteCache.downloadFile(remoteActionExecutionContext, path, digest);
177-
downloadsInProgress.put(path, download);
178-
Futures.addCallback(
179-
download,
180-
new FutureCallback<Void>() {
181-
@Override
182-
public void onSuccess(Void v) {
183-
synchronized (lock) {
184-
downloadsInProgress.remove(path);
185-
downloadedPaths.add(path);
186-
}
187-
188-
try {
189-
path.chmod(0755);
190-
} catch (IOException e) {
191-
logger.atWarning().withCause(e).log("Failed to chmod 755 on %s", path);
192-
}
167+
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(requestMetadata);
168+
169+
Digest digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
170+
download = remoteCache.downloadFile(context, path, digest);
171+
downloadsInProgress.put(path, download);
172+
Futures.addCallback(
173+
download,
174+
new FutureCallback<Void>() {
175+
@Override
176+
public void onSuccess(Void v) {
177+
synchronized (lock) {
178+
downloadsInProgress.remove(path);
179+
downloadedPaths.add(path);
193180
}
194181

195-
@Override
196-
public void onFailure(Throwable t) {
197-
synchronized (lock) {
198-
downloadsInProgress.remove(path);
199-
}
200-
try {
201-
path.delete();
202-
} catch (IOException e) {
203-
logger.atWarning().withCause(e).log(
204-
"Failed to delete output file after incomplete download: %s", path);
205-
}
182+
try {
183+
path.chmod(0755);
184+
} catch (IOException e) {
185+
logger.atWarning().withCause(e).log("Failed to chmod 755 on %s", path);
206186
}
207-
},
208-
MoreExecutors.directExecutor());
209-
} finally {
210-
ctx.detach(prevCtx);
211-
}
187+
}
188+
189+
@Override
190+
public void onFailure(Throwable t) {
191+
synchronized (lock) {
192+
downloadsInProgress.remove(path);
193+
}
194+
try {
195+
path.delete();
196+
} catch (IOException e) {
197+
logger.atWarning().withCause(e).log(
198+
"Failed to delete output file after incomplete download: %s", path);
199+
}
200+
}
201+
},
202+
MoreExecutors.directExecutor());
212203
}
213204
return download;
214205
}

src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ private void uploadOutputs(
188188
digests.addAll(digestToFile.keySet());
189189
digests.addAll(digestToBlobs.keySet());
190190

191-
ImmutableSet<Digest> digestsToUpload = getFromFuture(cacheProtocol.findMissingDigests(digests));
191+
ImmutableSet<Digest> digestsToUpload =
192+
getFromFuture(cacheProtocol.findMissingDigests(context, digests));
192193
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
193194
for (Digest digest : digestsToUpload) {
194195
Path file = digestToFile.get(digest);

src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void ensureInputsPresent(
6262
Iterable<Digest> allDigests =
6363
Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
6464
ImmutableSet<Digest> missingDigests =
65-
getFromFuture(cacheProtocol.findMissingDigests(allDigests));
65+
getFromFuture(cacheProtocol.findMissingDigests(context, allDigests));
6666

6767
List<ListenableFuture<Void>> uploadFutures = new ArrayList<>();
6868
for (Digest missingDigest : missingDigests) {

0 commit comments

Comments
 (0)