-
Notifications
You must be signed in to change notification settings - Fork 1.1k
BIgtable: 11. Implement ReadRows retries #2986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BIgtable: 11. Implement ReadRows retries #2986
Conversation
google-cloud-bigtable/pom.xml
Outdated
<dependency> | ||
<groupId>io.grpc</groupId> | ||
<artifactId>grpc-testing</artifactId> | ||
</dependency> |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more to come.
@@ -75,6 +79,15 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) | |||
.setEndpoint(settings.getEndpoint()) | |||
.setCredentialsProvider(settings.getCredentialsProvider()); | |||
|
|||
// ReadRow retries are handled in the overlay: disable retries in the base layer (but make |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
/** | ||
* An implementation of a {@link StreamResumptionStrategy} for merged rows. This class tracks the | ||
* last complete row seen and upon retry can build a request to resume the stream from were it left |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
implements StreamResumptionStrategy<ReadRowsRequest, RowT> { | ||
private final RowAdapter<RowT> rowAdapter; | ||
private ByteString lastKey = ByteString.EMPTY; | ||
private long numProcessed; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
} | ||
|
||
@Override | ||
public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
@Override | ||
public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { | ||
if (lastKey.isEmpty()) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
* rows. The logical row representation is configurable via a RowAdapter. Please note that | ||
* this will also emit special marker rows that help with retries in the next stage, but need | ||
* to be filtered out. | ||
* <li>ReadRowsTracker (+ helpers): Implements resuming retries for gax's Callables#retrying |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
if (request.getRowsLimit() > 0) { | ||
Preconditions.checkState( | ||
request.getRowsLimit() >= numProcessed, | ||
"Detected too many responses for the current row limit during a retry."); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Preconditions.checkState( | ||
request.getRowsLimit() >= numProcessed, | ||
"Detected too many responses for the current row limit during a retry."); | ||
builder.setRowsLimit(request.getRowsLimit() - numProcessed); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
if (request.getRows().getRowKeysList().isEmpty() | ||
&& request.getRows().getRowRangesList().isEmpty()) { | ||
newRowSet.addRowRanges(RowRange.newBuilder().setStartKeyOpen(lastKey).build()); | ||
} else { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
} | ||
|
||
for (RowRange rowRange : request.getRows().getRowRangesList()) { | ||
RowRange.Builder rangeBuilder = RowRange.newBuilder(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
} | ||
break; | ||
case ENDKEY_NOT_SET: | ||
rangeBuilder.clearEndKey(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
@After | ||
public void tearDown() throws Exception { | ||
if (client != null) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Preconditions.checkState( | ||
request.getRowsLimit() >= numProcessed, | ||
"Detected too many responses for the current row limit during a retry."); | ||
builder.setRowsLimit(request.getRowsLimit() - numProcessed); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
// synthetic row marker is emitted when the server has read a lot of data that was filtered out. | ||
// So it can trim the start of the scan, but does not contribute to the row limit. | ||
lastKey = rowAdapter.getKey(response); | ||
if (!rowAdapter.isScanMarkerRow(response)) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
// Special case: empty query implies full table scan | ||
if (request.getRows().getRowKeysList().isEmpty() | ||
&& request.getRows().getRowRangesList().isEmpty()) { | ||
newRowSet.addRowRanges(RowRange.newBuilder().setStartKeyOpen(lastKey).build()); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
} else { | ||
for (ByteString key : request.getRows().getRowKeysList()) { | ||
if (ByteStringComparator.INSTANCE.compare(key, lastKey) > 0) { | ||
newRowSet.addRowKeys(key); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
import org.mockito.runners.MockitoJUnitRunner; | ||
|
||
@RunWith(MockitoJUnitRunner.class) | ||
public class ReadRowsRetryTest { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
All feedback should be addressed. PTAL |
Thanks for reviewing! And also, really good catch on retrying a fulfilled request bug! All feedback should be addressed, PTAL |
@garrettjonesgoogle PTAL when you have a moment. If it looks ok, please merge |
Builder builder = request.toBuilder(); | ||
|
||
if (request.getRowsLimit() > 0) { | ||
// NOTE: the edge case of request.getRowsLimit() == numProcessed is handled below |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
if (request.getRows().getRowKeysList().isEmpty() | ||
&& request.getRows().getRowRangesList().isEmpty()) { | ||
|
||
request = |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
All feedback should be addressed, PTAL |
* an OK response. | ||
* | ||
* <p>This callable works in tandem with {@link ReadRowsResumptionStrategy}, which will send a | ||
* {@link #FULFILLED_REQUEST_MARKER} to be processed by this callable. Which will promptly notify |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
* Implementation details for {@link | ||
* com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub#readRowsCallable()}. | ||
* | ||
* <p>The ReadRows protocol is optimized for transmission and cannot be consumed directly. This |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
* to be filtered out. | ||
* <li>RowMerger (+ helpers): Implements resuming retries for gax's Callables#retrying middleware. | ||
* <li>FilterMarkerRowsCallable: Filters out marker rows that are used for efficient retry | ||
* resumes. This is necessary because |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
public void onProgress(RowT response) { | ||
// Last key can come from both the last processed row key and a synthetic row marker. The | ||
// synthetic row marker is emitted when the server has read a lot of data that was filtered out. | ||
// So it can trim the start of the scan, but does not contribute to the row limit. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@Override | ||
public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { | ||
// An empty lastKey means that we have not successfully read the first row, | ||
// resume with the original request object. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
ReadRowsRequest originalRequest = request; | ||
|
||
// Special case: empty query implies full table scan, make this explicit by adding an unbounded |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
continue; | ||
} else { | ||
rowRangeBuilder.setEndKeyClosed(rowRange.getEndKeyClosed()); | ||
} |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
} | ||
|
||
// Edge case: retrying a fulfilled request. | ||
// A fulfilled request is one that has had all of its row keys and ranges fulfilled or if it had |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -75,6 +80,15 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) | |||
.setEndpoint(settings.getEndpoint()) | |||
.setCredentialsProvider(settings.getCredentialsProvider()); | |||
|
|||
// ReadRow retries are handled in the overlay: disable retries in the base layer (but make | |||
// sure to preserve the exception callable settings. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
7682c94
to
92b4a71
Compare
Rebased & incorporated all feedback. PTAL, if it looks ok, please merge |
LGTM |
No description provided.