Skip to content

Commit b87a5ee

Browse files
igorbernstein2garrettjonesgoogle
authored andcommitted
BIgtable: 11. Implement ReadRows retries (#2986)
1 parent 9432956 commit b87a5ee

File tree

8 files changed

+744
-5
lines changed

8 files changed

+744
-5
lines changed

google-cloud-bigtable/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@
9191
<classifier>testlib</classifier>
9292
<scope>test</scope>
9393
</dependency>
94+
<dependency>
95+
<groupId>io.grpc</groupId>
96+
<artifactId>grpc-testing</artifactId>
97+
<scope>test</scope>
98+
</dependency>
9499
</dependencies>
95100
<profiles>
96101
<profile>

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.models;
1717

18+
import com.google.api.core.InternalExtensionOnly;
1819
import com.google.common.base.Preconditions;
1920
import com.google.protobuf.ByteString;
2021
import javax.annotation.Nonnull;
@@ -40,7 +41,8 @@
4041
* ByteStringRange r2 = r1.clone().endUnbounded();
4142
* }</pre>
4243
*/
43-
abstract class Range<T, R extends Range<T, R>> {
44+
@InternalExtensionOnly
45+
public abstract class Range<T, R extends Range<T, R>> {
4446
public enum BoundType {
4547
OPEN,
4648
CLOSED,

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

+36-3
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
import com.google.api.core.ApiFuture;
1919
import com.google.api.core.InternalApi;
20+
import com.google.api.gax.retrying.RetrySettings;
2021
import com.google.api.gax.rpc.ApiCallContext;
2122
import com.google.api.gax.rpc.Callables;
2223
import com.google.api.gax.rpc.ClientContext;
24+
import com.google.api.gax.rpc.ServerStreamingCallSettings;
2325
import com.google.api.gax.rpc.ServerStreamingCallable;
2426
import com.google.api.gax.rpc.UnaryCallable;
2527
import com.google.bigtable.v2.ReadRowsRequest;
@@ -35,10 +37,13 @@
3537
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
3638
import com.google.cloud.bigtable.data.v2.models.RowMutation;
3739
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
40+
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
41+
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
3842
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
3943
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
4044
import java.io.IOException;
4145
import java.util.List;
46+
import org.threeten.bp.Duration;
4247

4348
/**
4449
* The core client that converts method calls to RPCs.
@@ -75,6 +80,15 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
7580
.setEndpoint(settings.getEndpoint())
7681
.setCredentialsProvider(settings.getCredentialsProvider());
7782

83+
// ReadRow retries are handled in the overlay: disable retries in the base layer (but make
84+
// sure to preserve the exception callable settings).
85+
baseSettingsBuilder
86+
.readRowsSettings()
87+
.setSimpleTimeoutNoRetries(Duration.ofHours(2))
88+
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
89+
.setTimeoutCheckInterval(Duration.ZERO)
90+
.setIdleTimeout(Duration.ZERO);
91+
7892
// SampleRowKeys retries are handled in the overlay: disable retries in the base layer (but make
7993
// sure to preserve the exception callable settings.
8094
baseSettingsBuilder
@@ -135,8 +149,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
135149
* dispatch the RPC.
136150
* <li>Upon receiving the response stream, it will merge the {@link
137151
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
138-
* implementation can be configured in {@link
139-
* com.google.cloud.bigtable.data.v2.BigtableDataSettings}.
152+
* implementation can be configured in by the {@code rowAdapter} parameter.
140153
* <li>Retry/resume on failure.
141154
* <li>Filter out marker rows.
142155
* </ul>
@@ -147,7 +160,27 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
147160
ServerStreamingCallable<ReadRowsRequest, RowT> merging =
148161
new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter);
149162

150-
FilterMarkerRowsCallable<RowT> filtering = new FilterMarkerRowsCallable<>(merging, rowAdapter);
163+
// Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the outer
164+
// Query -> RowT callable or the inner ReadRowsRequest -> ReadRowsResponse callable).
165+
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
166+
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
167+
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
168+
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
169+
.setRetrySettings(settings.readRowsSettings().getRetrySettings())
170+
.setTimeoutCheckInterval(settings.readRowsSettings().getTimeoutCheckInterval())
171+
.setIdleTimeout(settings.readRowsSettings().getIdleTimeout())
172+
.build();
173+
174+
// Retry logic is split into 2 parts to workaround a rare edge case described in
175+
// ReadRowsRetryCompletedCallable
176+
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
177+
new ReadRowsRetryCompletedCallable<>(merging);
178+
179+
ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
180+
Callables.retrying(retrying1, innerSettings, clientContext);
181+
182+
FilterMarkerRowsCallable<RowT> filtering =
183+
new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
151184

152185
ServerStreamingCallable<ReadRowsRequest, RowT> withContext =
153186
filtering.withDefaultCallContext(clientContext.getDefaultCallContext());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.retrying.StreamResumptionStrategy;
20+
import com.google.bigtable.v2.ReadRowsRequest;
21+
import com.google.bigtable.v2.ReadRowsRequest.Builder;
22+
import com.google.bigtable.v2.RowRange;
23+
import com.google.bigtable.v2.RowSet;
24+
import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
25+
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
26+
import com.google.common.base.Preconditions;
27+
import com.google.protobuf.ByteString;
28+
29+
/**
30+
* An implementation of a {@link StreamResumptionStrategy} for merged rows. This class tracks the
31+
* last complete row seen and upon retry can build a request to resume the stream from where it left
32+
* off.
33+
*
34+
* <p>This class is considered an internal implementation detail and not meant to be used by
35+
* applications.
36+
*/
37+
@InternalApi
38+
public class ReadRowsResumptionStrategy<RowT>
39+
implements StreamResumptionStrategy<ReadRowsRequest, RowT> {
40+
private final RowAdapter<RowT> rowAdapter;
41+
private ByteString lastKey = ByteString.EMPTY;
42+
// Number of rows processed excluding Marker row.
43+
private long numProcessed;
44+
45+
public ReadRowsResumptionStrategy(RowAdapter<RowT> rowAdapter) {
46+
this.rowAdapter = rowAdapter;
47+
}
48+
49+
@Override
50+
public boolean canResume() {
51+
return true;
52+
}
53+
54+
@Override
55+
public StreamResumptionStrategy<ReadRowsRequest, RowT> createNew() {
56+
return new ReadRowsResumptionStrategy<>(rowAdapter);
57+
}
58+
59+
@Override
60+
public void onProgress(RowT response) {
61+
// Last key can come from both the last processed row key and a synthetic row marker. The
62+
// synthetic row marker is emitted when the server has read a lot of data that was filtered out.
63+
// The row marker can be used to trim the start of the scan, but does not contribute to the row
64+
// limit.
65+
lastKey = rowAdapter.getKey(response);
66+
if (!rowAdapter.isScanMarkerRow(response)) {
67+
// Only real rows count towards the rows limit.
68+
numProcessed++;
69+
}
70+
}
71+
72+
/**
73+
* {@inheritDoc}
74+
*
75+
* <p>Given a request, this implementation will narrow that request to exclude all row keys and
76+
* ranges that would produce rows that come before {@link #lastKey}. Furthermore this
77+
* implementation takes care to update the row limit of the request to account for all of the
78+
* received rows.
79+
*/
80+
@Override
81+
public ReadRowsRequest getResumeRequest(ReadRowsRequest request) {
82+
// An empty lastKey means that we have not successfully read the first row,
83+
// so resume with the original request object.
84+
if (lastKey.isEmpty()) {
85+
return request;
86+
}
87+
88+
ReadRowsRequest originalRequest = request;
89+
90+
// Special case: empty query implies full table scan, so make this explicit by adding an
91+
// unbounded range to the request
92+
if (request.getRows().getRowKeysList().isEmpty()
93+
&& request.getRows().getRowRangesList().isEmpty()) {
94+
95+
originalRequest =
96+
request
97+
.toBuilder()
98+
.setRows(RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()))
99+
.build();
100+
}
101+
102+
// Start building the resume request. The keys & ranges are cleared and will be recomputed.
103+
Builder builder = originalRequest.toBuilder();
104+
builder.clearRows();
105+
106+
RowSet.Builder rowSetBuilder = RowSet.newBuilder();
107+
108+
for (ByteString key : originalRequest.getRows().getRowKeysList()) {
109+
if (ByteStringComparator.INSTANCE.compare(key, lastKey) > 0) {
110+
rowSetBuilder.addRowKeys(key);
111+
}
112+
}
113+
114+
for (RowRange rowRange : originalRequest.getRows().getRowRangesList()) {
115+
RowRange.Builder rowRangeBuilder = RowRange.newBuilder();
116+
117+
switch (rowRange.getEndKeyCase()) {
118+
case END_KEY_CLOSED:
119+
if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyClosed(), lastKey) > 0) {
120+
rowRangeBuilder.setEndKeyClosed(rowRange.getEndKeyClosed());
121+
} else {
122+
continue;
123+
}
124+
break;
125+
case END_KEY_OPEN:
126+
if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyOpen(), lastKey) > 0) {
127+
rowRangeBuilder.setEndKeyOpen(rowRange.getEndKeyOpen());
128+
} else {
129+
continue;
130+
}
131+
break;
132+
case ENDKEY_NOT_SET:
133+
rowRangeBuilder.clearEndKey();
134+
break;
135+
default:
136+
throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase());
137+
}
138+
139+
switch (rowRange.getStartKeyCase()) {
140+
case STARTKEY_NOT_SET:
141+
rowRangeBuilder.setStartKeyOpen(lastKey);
142+
break;
143+
case START_KEY_OPEN:
144+
if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyOpen(), lastKey) < 0) {
145+
rowRangeBuilder.setStartKeyOpen(lastKey);
146+
} else {
147+
rowRangeBuilder.setStartKeyOpen(rowRange.getStartKeyOpen());
148+
}
149+
break;
150+
case START_KEY_CLOSED:
151+
if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyClosed(), lastKey) <= 0) {
152+
rowRangeBuilder.setStartKeyOpen(lastKey);
153+
} else {
154+
rowRangeBuilder.setStartKeyClosed(rowRange.getStartKeyClosed());
155+
}
156+
break;
157+
default:
158+
throw new IllegalArgumentException("Unknown startKeyCase: " + rowRange.getStartKeyCase());
159+
}
160+
rowSetBuilder.addRowRanges(rowRangeBuilder.build());
161+
}
162+
163+
// Edge case: retrying a fulfilled request.
164+
// A fulfilled request is one that has had all of its row keys and ranges fulfilled, or if it
165+
// had a row limit, has seen enough rows. These requests are replaced with a marker request that
166+
// will be handled by ReadRowsRetryCompletedCallable. See docs in ReadRowsRetryCompletedCallable
167+
// for more details.
168+
if ((rowSetBuilder.getRowRangesCount() == 0 && rowSetBuilder.getRowKeysCount() == 0)
169+
|| (originalRequest.getRowsLimit() > 0 && originalRequest.getRowsLimit() == numProcessed)) {
170+
return ReadRowsRetryCompletedCallable.FULFILLED_REQUEST_MARKER;
171+
}
172+
173+
if (originalRequest.getRowsLimit() > 0) {
174+
Preconditions.checkState(
175+
originalRequest.getRowsLimit() > numProcessed,
176+
"Detected too many rows for the current row limit during a retry.");
177+
builder.setRowsLimit(originalRequest.getRowsLimit() - numProcessed);
178+
}
179+
180+
builder.setRows(rowSetBuilder.build());
181+
return builder.build();
182+
}
183+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.rpc.ApiCallContext;
20+
import com.google.api.gax.rpc.ResponseObserver;
21+
import com.google.api.gax.rpc.ServerStreamingCallable;
22+
import com.google.api.gax.rpc.StreamController;
23+
import com.google.bigtable.v2.ReadRowsRequest;
24+
25+
/**
26+
* This callable addresses edge case of a ReadRows stream receiving all of the rows, but receiving a
27+
* retryable error status instead of an OK. If a retry attempt is scheduled, then it should return
28+
* an OK response.
29+
*
30+
* <p>This callable works in tandem with {@link ReadRowsResumptionStrategy}, which will send a
31+
* {@link #FULFILLED_REQUEST_MARKER} to be processed by this callable. Upon receiving the {@link
32+
* #FULFILLED_REQUEST_MARKER}, this callable will promptly notify the {@link ResponseObserver} that
33+
* the stream has been successfully compeleted.
34+
*
35+
* <p>This class is considered an internal implementation detail and not meant to be used by
36+
* applications.
37+
*/
38+
@InternalApi
39+
public final class ReadRowsRetryCompletedCallable<RowT>
40+
extends ServerStreamingCallable<ReadRowsRequest, RowT> {
41+
static final ReadRowsRequest FULFILLED_REQUEST_MARKER =
42+
ReadRowsRequest.newBuilder().setRowsLimit(-1).build();
43+
44+
private final ServerStreamingCallable<ReadRowsRequest, RowT> inner;
45+
46+
public ReadRowsRetryCompletedCallable(ServerStreamingCallable<ReadRowsRequest, RowT> inner) {
47+
this.inner = inner;
48+
}
49+
50+
@Override
51+
public void call(
52+
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
53+
54+
if (request == FULFILLED_REQUEST_MARKER) {
55+
responseObserver.onStart(new DummyController());
56+
responseObserver.onComplete();
57+
} else {
58+
inner.call(request, responseObserver, context);
59+
}
60+
}
61+
62+
private static class DummyController implements StreamController {
63+
@Override
64+
public void cancel() {}
65+
66+
@Override
67+
public void disableAutoInboundFlowControl() {}
68+
69+
@Override
70+
public void request(int count) {}
71+
}
72+
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
* An implementation of a {@link Reframer} that feeds the row merging {@link StateMachine}.
2626
*
2727
* <p>{@link com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver} pushes {@link
28-
* ReadRowsResponse.CellChunk}s into this class and pops fully merged logical rows. Example usage:
28+
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s into this class and pops fully merged logical
29+
* rows. Example usage:
2930
*
3031
* <pre>{@code
3132
* RowMerger<Row> rowMerger = new RowMerger<>(myRowBuilder);

0 commit comments

Comments
 (0)