Skip to content

Commit cf58f26

Browse files
feat: add an experimental feature to skip waiting for trailers for unary ops (#2404)
* feat: add an experimental feature to skip waiting for trailers for unary ops This is off by default and can be enabled using an environment variable. When enabled, BigtableUnaryOperationCallable will resolve the user visible future immediately when a response is available and will tell metrics to freeze all timers. Metrics will still wait for the trailers in the background for necessary metadata to publish the frozen timer values. Change-Id: I2101ff375de711693720af4fd2e9535aa5355f9d * more testing Change-Id: Ifc95aa89c080ee8395d43adce1172f11354c306e * cosmetics Change-Id: I679aeac3ec7475757ce769f4c64ede1130b35ebd * comment Change-Id: Ia535905f4fed6f30854c05ceb300af39877ca4a1 * fix test Change-Id: I77664e40c9fd2d52b609f5063386b158cbc1e81e
1 parent 75d4105 commit cf58f26

File tree

10 files changed

+370
-46
lines changed

10 files changed

+370
-46
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java

+54-30
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,16 @@
2424
import com.google.api.gax.rpc.ServerStreamingCallable;
2525
import com.google.api.gax.rpc.StreamController;
2626
import com.google.api.gax.rpc.UnaryCallable;
27-
import com.google.api.gax.tracing.ApiTracer;
2827
import com.google.api.gax.tracing.ApiTracerFactory;
2928
import com.google.api.gax.tracing.SpanName;
29+
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;
3030
import com.google.common.base.Preconditions;
31+
import com.google.common.util.concurrent.Futures;
3132
import io.grpc.Status;
33+
import java.util.concurrent.ExecutionException;
3234
import java.util.concurrent.atomic.AtomicBoolean;
3335
import java.util.logging.Level;
3436
import java.util.logging.Logger;
35-
import javax.annotation.Nullable;
3637

3738
/**
3839
* Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link
@@ -73,9 +74,10 @@ public BigtableUnaryOperationCallable(
7374
public ApiFuture<RespT> futureCall(ReqT req, ApiCallContext apiCallContext) {
7475
apiCallContext = defaultCallContext.merge(apiCallContext);
7576

76-
ApiTracer apiTracer =
77-
tracerFactory.newTracer(
78-
apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary);
77+
BigtableTracer apiTracer =
78+
(BigtableTracer)
79+
tracerFactory.newTracer(
80+
apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary);
7981

8082
apiCallContext = apiCallContext.withTracer(apiTracer);
8183

@@ -85,18 +87,15 @@ public ApiFuture<RespT> futureCall(ReqT req, ApiCallContext apiCallContext) {
8587
}
8688

8789
class UnaryFuture extends AbstractApiFuture<RespT> implements ResponseObserver<RespT> {
88-
private final ApiTracer tracer;
90+
private final BigtableTracer tracer;
8991
private final boolean allowNoResponse;
9092

9193
private StreamController controller;
9294
private final AtomicBoolean upstreamCancelled = new AtomicBoolean();
93-
private boolean responseReceived;
94-
private @Nullable RespT response;
9595

96-
private UnaryFuture(ApiTracer tracer, boolean allowNoResponse) {
96+
private UnaryFuture(BigtableTracer tracer, boolean allowNoResponse) {
9797
this.tracer = Preconditions.checkNotNull(tracer, "tracer can't be null");
9898
this.allowNoResponse = allowNoResponse;
99-
this.responseReceived = false;
10099
}
101100

102101
@Override
@@ -130,23 +129,39 @@ private void cancelUpstream() {
130129
public void onResponse(RespT resp) {
131130
tracer.responseReceived();
132131

133-
// happy path - buffer the only responsse
134-
if (!responseReceived) {
135-
responseReceived = true;
136-
this.response = resp;
132+
if (set(resp)) {
133+
tracer.operationFinishEarly();
137134
return;
138135
}
139136

140-
String msg =
141-
String.format(
142-
"Received multiple responses for a %s unary operation. Previous: %s, New: %s",
143-
spanName, response, resp);
144-
logger.log(Level.WARNING, msg);
137+
// At this point we are guaranteed that the future has been resolved. However we need to check
138+
// why.
139+
// We know it's not because it was resolved with the current response. Moreover, since the
140+
// future
141+
// is resolved, our only means to flag the error is to log.
142+
// So there are 3 possibilities:
143+
// 1. user cancelled the future
144+
// 2. this is an extra response and the previous one resolved the future
145+
// 3. we got a response after the rpc failed (this should never happen and would be a bad bug)
145146

146-
InternalException error =
147-
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
148-
if (setException(error)) {
149-
tracer.operationFailed(error);
147+
if (isCancelled()) {
148+
return;
149+
}
150+
151+
try {
152+
RespT prev = Futures.getDone(this);
153+
String msg =
154+
String.format(
155+
"Received response after future is resolved for a %s unary operation. previous: %s, New response: %s",
156+
spanName, prev, resp);
157+
logger.log(Level.WARNING, msg);
158+
} catch (ExecutionException e) {
159+
// Should never happen
160+
String msg =
161+
String.format(
162+
"Received response after future resolved as a failure for a %s unary operation. New response: %s",
163+
spanName, resp);
164+
logger.log(Level.WARNING, msg, e.getCause());
150165
}
151166

152167
cancelUpstream();
@@ -158,18 +173,24 @@ public void onError(Throwable throwable) {
158173
tracer.operationFailed(throwable);
159174
} else if (isCancelled()) {
160175
tracer.operationCancelled();
176+
} else {
177+
// At this point the has been resolved, so we ignore the error
178+
tracer.operationSucceeded();
161179
}
162-
// The future might've been resolved due to double response
163180
}
164181

165182
@Override
166183
public void onComplete() {
167-
if (allowNoResponse || responseReceived) {
168-
if (set(response)) {
169-
tracer.operationSucceeded();
170-
return;
171-
}
172-
} else {
184+
if (allowNoResponse && set(null)) {
185+
tracer.operationSucceeded();
186+
return;
187+
188+
// Under normal circumstances the future wouldve been resolved in onResponse or via
189+
// set(null) if it expected for
190+
// the rpc to not have a response. So if aren't done, the only reason is that we didn't get
191+
// a response
192+
// but were expecting one
193+
} else if (!isDone()) {
173194
String msg = spanName + " unary operation completed without a response message";
174195
InternalException e =
175196
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
@@ -183,7 +204,10 @@ public void onComplete() {
183204
// check cancellation race
184205
if (isCancelled()) {
185206
tracer.operationCancelled();
207+
return;
186208
}
209+
210+
tracer.operationSucceeded();
187211
}
188212
}
189213
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
569569
* </ul>
570570
*/
571571
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
572-
if (!EnhancedBigtableStubSettings.SKIP_TRAILERS) {
572+
if (!settings.getEnableSkipTrailers()) {
573573
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
574574
createReadRowsBaseCallable(
575575
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
@@ -1347,7 +1347,7 @@ private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnar
13471347
UnaryCallSettings<ReqT, RespT> callSettings,
13481348
Function<ReqT, BaseReqT> requestTransformer,
13491349
Function<BaseRespT, RespT> responseTranformer) {
1350-
if (EnhancedBigtableStubSettings.SKIP_TRAILERS) {
1350+
if (settings.getEnableSkipTrailers()) {
13511351
return createUnaryCallableNew(
13521352
methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
13531353
} else {

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
109109
private static final boolean DIRECT_PATH_ENABLED =
110110
Boolean.parseBoolean(System.getenv("CBT_ENABLE_DIRECTPATH"));
111111

112-
static final boolean SKIP_TRAILERS =
112+
private static final boolean SKIP_TRAILERS =
113113
Optional.ofNullable(System.getenv("CBT_SKIP_HEADERS"))
114114
.map(Boolean::parseBoolean)
115115
.orElse(DIRECT_PATH_ENABLED);
@@ -240,6 +240,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
240240
private final Map<String, String> jwtAudienceMapping;
241241
private final boolean enableRoutingCookie;
242242
private final boolean enableRetryInfo;
243+
private final boolean enableSkipTrailers;
243244

244245
private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
245246
private final UnaryCallSettings<Query, Row> readRowSettings;
@@ -287,6 +288,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
287288
jwtAudienceMapping = builder.jwtAudienceMapping;
288289
enableRoutingCookie = builder.enableRoutingCookie;
289290
enableRetryInfo = builder.enableRetryInfo;
291+
enableSkipTrailers = builder.enableSkipTrailers;
290292
metricsProvider = builder.metricsProvider;
291293
metricsEndpoint = builder.metricsEndpoint;
292294

@@ -373,6 +375,10 @@ public boolean getEnableRetryInfo() {
373375
return enableRetryInfo;
374376
}
375377

378+
boolean getEnableSkipTrailers() {
379+
return enableSkipTrailers;
380+
}
381+
376382
/**
377383
* Gets the Google Cloud Monitoring endpoint for publishing client side metrics. If it's null,
378384
* client will publish metrics to the default monitoring endpoint.
@@ -683,6 +689,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
683689
private Map<String, String> jwtAudienceMapping;
684690
private boolean enableRoutingCookie;
685691
private boolean enableRetryInfo;
692+
private boolean enableSkipTrailers;
686693

687694
private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
688695
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
@@ -721,6 +728,7 @@ private Builder() {
721728
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
722729
this.enableRoutingCookie = true;
723730
this.enableRetryInfo = true;
731+
this.enableSkipTrailers = SKIP_TRAILERS;
724732
metricsProvider = DefaultMetricsProvider.INSTANCE;
725733

726734
// Defaults provider
@@ -1085,6 +1093,11 @@ public boolean getEnableRetryInfo() {
10851093
return enableRetryInfo;
10861094
}
10871095

1096+
Builder setEnableSkipTrailers(boolean enabled) {
1097+
this.enableSkipTrailers = enabled;
1098+
return this;
1099+
}
1100+
10881101
/** Returns the builder for the settings used for calls to readRows. */
10891102
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
10901103
return readRowsSettings;
@@ -1212,6 +1225,7 @@ public String toString() {
12121225
.add("jwtAudienceMapping", jwtAudienceMapping)
12131226
.add("enableRoutingCookie", enableRoutingCookie)
12141227
.add("enableRetryInfo", enableRetryInfo)
1228+
.add("enableSkipTrailers", enableSkipTrailers)
12151229
.add("readRowsSettings", readRowsSettings)
12161230
.add("readRowSettings", readRowSettings)
12171231
.add("sampleRowKeysSettings", sampleRowKeysSettings)

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java

+7
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ public void afterResponse(long applicationLatency) {
5858
// noop
5959
}
6060

61+
/**
62+
* Used by BigtableUnaryOperationCallable to signal that the user visible portion of the RPC is
63+
* complete and that metrics should freeze the timers and then publish the frozen values when the
64+
* internal portion of the operation completes.
65+
*/
66+
public void operationFinishEarly() {}
67+
6168
/**
6269
* Get the attempt number of the current call. Attempt number for the current call is passed in
6370
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class BuiltinMetricsTracer extends BigtableTracer {
5555
private final SpanName spanName;
5656

5757
// Operation level metrics
58+
private final AtomicBoolean operationFinishedEarly = new AtomicBoolean();
5859
private final AtomicBoolean opFinished = new AtomicBoolean();
5960
private final Stopwatch operationTimer = Stopwatch.createStarted();
6061
private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();
@@ -142,6 +143,13 @@ public void close() {}
142143
};
143144
}
144145

146+
@Override
147+
public void operationFinishEarly() {
148+
operationFinishedEarly.set(true);
149+
attemptTimer.stop();
150+
operationTimer.stop();
151+
}
152+
145153
@Override
146154
public void operationSucceeded() {
147155
recordOperationCompletion(null);
@@ -207,6 +215,11 @@ public void attemptPermanentFailure(Throwable throwable) {
207215
@Override
208216
public void onRequest(int requestCount) {
209217
requestLeft.accumulateAndGet(requestCount, IntMath::saturatedAdd);
218+
219+
if (operationFinishedEarly.get()) {
220+
return;
221+
}
222+
210223
if (flowControlIsDisabled) {
211224
// On request is only called when auto flow control is disabled. When auto flow control is
212225
// disabled, server latency is measured between onRequest and onResponse.
@@ -220,6 +233,10 @@ public void onRequest(int requestCount) {
220233

221234
@Override
222235
public void responseReceived() {
236+
if (operationFinishedEarly.get()) {
237+
return;
238+
}
239+
223240
if (firstResponsePerOpTimer.isRunning()) {
224241
firstResponsePerOpTimer.stop();
225242
}
@@ -241,6 +258,9 @@ public void responseReceived() {
241258
@Override
242259
public void afterResponse(long applicationLatency) {
243260
if (!flowControlIsDisabled || requestLeft.decrementAndGet() > 0) {
261+
if (operationFinishedEarly.get()) {
262+
return;
263+
}
244264
// When auto flow control is enabled, request will never be called, so server latency is
245265
// measured between after the last response is processed and before the next response is
246266
// received. If flow control is disabled but requestLeft is greater than 0,
@@ -295,10 +315,14 @@ public void disableFlowControl() {
295315
}
296316

297317
private void recordOperationCompletion(@Nullable Throwable status) {
318+
if (operationFinishedEarly.get()) {
319+
status = null; // force an ok
320+
}
321+
298322
if (!opFinished.compareAndSet(false, true)) {
299323
return;
300324
}
301-
operationTimer.stop();
325+
long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS);
302326

303327
boolean isStreaming = operationType == OperationType.ServerStreaming;
304328
String statusStr = Util.extractStatus(status);
@@ -317,8 +341,6 @@ private void recordOperationCompletion(@Nullable Throwable status) {
317341
.put(STATUS_KEY, statusStr)
318342
.build();
319343

320-
long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS);
321-
322344
// Only record when retry count is greater than 0 so the retry
323345
// graph will be less confusing
324346
if (attemptCount > 1) {
@@ -339,6 +361,9 @@ private void recordOperationCompletion(@Nullable Throwable status) {
339361
}
340362

341363
private void recordAttemptCompletion(@Nullable Throwable status) {
364+
if (operationFinishedEarly.get()) {
365+
status = null; // force an ok
366+
}
342367
// If the attempt failed, the time spent in retry should be counted in application latency.
343368
// Stop the stopwatch and decrement requestLeft.
344369
synchronized (timerLock) {

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java

+7
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ public void close() {
6262
};
6363
}
6464

65+
@Override
66+
public void operationFinishEarly() {
67+
for (BigtableTracer tracer : bigtableTracers) {
68+
tracer.operationFinishEarly();
69+
}
70+
}
71+
6572
@Override
6673
public void operationSucceeded() {
6774
for (ApiTracer child : children) {

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ public void close() {}
8484
};
8585
}
8686

87+
@Override
88+
public void operationFinishEarly() {
89+
attemptTimer.stop();
90+
operationTimer.stop();
91+
}
92+
8793
@Override
8894
public void operationSucceeded() {
8995
recordOperationCompletion(null);
@@ -103,7 +109,6 @@ private void recordOperationCompletion(@Nullable Throwable throwable) {
103109
if (!opFinished.compareAndSet(false, true)) {
104110
return;
105111
}
106-
operationTimer.stop();
107112

108113
long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS);
109114

0 commit comments

Comments
 (0)