Skip to content

Commit 799710f

Browse files
igorbernstein2garrettjonesgoogle
authored andcommitted
Bigtable: Implement query sharding by generalizing ReadRows resume request builder. (#3103)
The generalized sharding can be used by map reduce style frameworks like beam.
1 parent edc7aff commit 799710f

File tree

6 files changed

+1164
-80
lines changed

6 files changed

+1164
-80
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,388 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.internal;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.auto.value.AutoValue;
20+
import com.google.bigtable.v2.ReadRowsRequest;
21+
import com.google.bigtable.v2.RowRange;
22+
import com.google.bigtable.v2.RowSet;
23+
import com.google.cloud.bigtable.data.v2.models.Query;
24+
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
25+
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
26+
import com.google.common.base.Preconditions;
27+
import com.google.common.collect.ComparisonChain;
28+
import com.google.common.collect.ImmutableSortedSet;
29+
import com.google.common.collect.Lists;
30+
import com.google.protobuf.ByteString;
31+
import java.util.Arrays;
32+
import java.util.Comparator;
33+
import java.util.List;
34+
import java.util.SortedSet;
35+
import javax.annotation.Nonnull;
36+
import javax.annotation.Nullable;
37+
38+
/**
39+
* Internal helper to split a {@link RowSet} into segments based on keys.
40+
*
41+
* <p>This class is considered an internal implementation detail and not meant to be used by
42+
* applications.
43+
*
44+
* @see Query#shard(List)
45+
* @see Query#getBound()
46+
* @see ReadRowsResumptionStrategy#getResumeRequest(ReadRowsRequest)
47+
*/
48+
@InternalApi
49+
public final class RowSetUtil {
50+
private RowSetUtil() {}
51+
52+
/**
53+
* Splits the provided {@link RowSet} along the provided splitPoint into 2 segments.
54+
* The right segment will contain all keys that are strictly greater than the splitPoint and all
55+
* {@link RowRange}s truncated to start right after the splitPoint.
56+
*/
57+
@Nonnull
58+
public static Split split(@Nonnull RowSet rowSet, @Nonnull ByteString splitPoint) {
59+
ImmutableSortedSet<ByteString> splitPoints =
60+
ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE).add(splitPoint).build();
61+
62+
List<RowSet> splits = split(rowSet, splitPoints, true);
63+
64+
return Split.of(splits.get(0), splits.get(1));
65+
}
66+
67+
/**
68+
* Splits the provided {@link RowSet} into segments partitioned by the provided {@code
69+
* splitPoints}. Each split point represents the last row of the corresponding segment. The row
70+
* keys contained in the provided {@link RowSet} will be distributed across the segments. Each
71+
* range in the {@link RowSet} will be split up across each segment.
72+
*
73+
* @see #split(RowSet, SortedSet, boolean) for more details.
74+
*/
75+
@Nonnull
76+
public static List<RowSet> shard(
77+
@Nonnull RowSet rowSet, @Nonnull SortedSet<ByteString> splitPoints) {
78+
return split(rowSet, splitPoints, false);
79+
}
80+
81+
/**
82+
* Split a {@link RowSet} into segments.
83+
*
84+
* <p>Each segment is defined by a split point. The split point identifies the segment's inclusive
85+
* end. This means that the first segment will start at the beginning of the table and extend to
86+
* include the first split point. The last segment will start just after the last split point and
87+
* extend until the end of the table. The maximum number of segments that can be returned is the
88+
* number of split points + 1.
89+
*
90+
* <p>Each segment is represented by a RowSet in the returned List. Each of the returned RowSets
91+
* will contain all of the {@link RowRange}s and keys that fall between the previous segment and
92+
* this segment's split point. If there are no {@link RowRange}s or keys that belong to a segment,
93+
* then that segment will either be omitted or if {@code preserveNullSegments} is true, then it
94+
* will be represented by a null value in the returned list.
95+
*
96+
* <p>The segments in the returned list are guaranteed to be sorted. If {@code
97+
* preserveNullSegments} is true, then it will have exactly {@code splitPoints.size() + 1} items.
98+
* The extra segment will contain keys and {@link RowRange}s between the last splitPoint and the
99+
* end of the table.
100+
*
101+
* <p>Please note that an empty {@link RowSet} is treated like a full table scan and each segment
102+
* will contain a {@link RowRange} that covers the full extent of the segment.
103+
*/
104+
@Nonnull
105+
static List<RowSet> split(
106+
@Nonnull RowSet rowSet,
107+
@Nonnull SortedSet<ByteString> splitPoints,
108+
boolean preserveNullSegments) {
109+
// An empty RowSet represents a full table scan. Make that explicit so that there is RowRange to
110+
// split.
111+
if (RowSet.getDefaultInstance().equals(rowSet)) {
112+
rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
113+
}
114+
115+
// Create sorted copies of the ranges and keys in the RowSet
116+
ByteString[] rowKeys =
117+
rowSet.getRowKeysList().toArray(new ByteString[rowSet.getRowKeysCount()]);
118+
RowRange[] rowRanges =
119+
rowSet.getRowRangesList().toArray(new RowRange[rowSet.getRowRangesCount()]);
120+
121+
Arrays.sort(rowKeys, ByteStringComparator.INSTANCE);
122+
Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
123+
124+
List<RowSet> results = Lists.newArrayList();
125+
126+
// Track consumption of input ranges & keys.
127+
int rowKeysStart = 0;
128+
int rowRangesStart = 0;
129+
130+
// Keys and ranges that lie before the current split point,
131+
RowSet.Builder segment = RowSet.newBuilder();
132+
boolean isSegmentEmpty = true;
133+
134+
for (ByteString splitPoint : splitPoints) {
135+
Preconditions.checkState(!splitPoint.isEmpty(), "Split point can't be empty");
136+
137+
// Consume all of the row keys that lie on and to the left of the split point. Consumption is
138+
// designated by advancing rowKeysStart.
139+
for (int i = rowKeysStart; i < rowKeys.length; i++) {
140+
ByteString rowKey = rowKeys[i];
141+
if (ByteStringComparator.INSTANCE.compare(rowKey, splitPoint) <= 0) {
142+
segment.addRowKeys(rowKey);
143+
isSegmentEmpty = false;
144+
rowKeysStart++;
145+
} else {
146+
// This key and all following keys belong to a later segment.
147+
break;
148+
}
149+
}
150+
151+
// Consume all of the ranges that lie before the split point (splitting the range if
152+
// necessary). Consumption is designated by advancing rowRangesStart.
153+
for (int i = rowRangesStart; i < rowRanges.length; i++) {
154+
RowRange rowRange = rowRanges[i];
155+
156+
// Break early when encountering the first start point that is past the split point.
157+
// (The split point is the inclusive end of of the segment)
158+
int startCmp = StartPoint.extract(rowRange).compareTo(new StartPoint(splitPoint, true));
159+
if (startCmp > 0) {
160+
break;
161+
}
162+
163+
// Some part of this range will be in the segment.
164+
isSegmentEmpty = false;
165+
166+
// Figure out the endpoint and remainder.
167+
int endCmp = EndPoint.extract(rowRange).compareTo(new EndPoint(splitPoint, true));
168+
if (endCmp <= 0) {
169+
// The range is fully contained in the segment.
170+
segment.addRowRanges(rowRange);
171+
172+
// Consume the range, but take care to shift partially consumed ranges to fill the gap
173+
// created by consuming the current range. For example if the list contained the following
174+
// ranges: [a-z], [b-d], [f-z] and the split point was 'e'. Then after processing the
175+
// split point, the list would contain: (d-z], GAP, [f-z]. So we fill the gap by shifting
176+
// (d-z] over by one and advancing rowRangesStart.
177+
// Partially consumed ranges will only exist if the original RowSet had overlapping
178+
// ranges, this should be a rare occurrence.
179+
System.arraycopy(
180+
rowRanges, rowRangesStart, rowRanges, rowRangesStart + 1, i - rowRangesStart);
181+
rowRangesStart++;
182+
} else {
183+
// The range is split:
184+
// Add the left part to the segment
185+
RowRange leftSubRange = rowRange.toBuilder().setEndKeyClosed(splitPoint).build();
186+
segment.addRowRanges(leftSubRange);
187+
// Save the remainder for the next segment. This is done by replacing the current rowRange
188+
// with the remainder and not advancing rowRangesStart.
189+
RowRange rightSubRange = rowRange.toBuilder().setStartKeyOpen(splitPoint).build();
190+
rowRanges[i] = rightSubRange;
191+
}
192+
}
193+
194+
// Build the current segment
195+
if (!isSegmentEmpty) {
196+
results.add(segment.build());
197+
isSegmentEmpty = true;
198+
segment = RowSet.newBuilder();
199+
} else if (preserveNullSegments) {
200+
results.add(null);
201+
}
202+
}
203+
204+
// Create the last segment (from the last splitKey to the end of the table)
205+
for (int i = rowKeysStart; i < rowKeys.length; i++) {
206+
isSegmentEmpty = false;
207+
segment.addRowKeys(rowKeys[i]);
208+
}
209+
for (int i = rowRangesStart; i < rowRanges.length; i++) {
210+
isSegmentEmpty = false;
211+
segment.addRowRanges(rowRanges[i]);
212+
}
213+
if (!isSegmentEmpty) {
214+
results.add(segment.build());
215+
} else if (preserveNullSegments) {
216+
results.add(null);
217+
}
218+
219+
return results;
220+
}
221+
222+
/** Get the bounding range of a {@link RowSet}. */
223+
public static ByteStringRange getBound(RowSet rowSet) {
224+
// Find min & max keys
225+
ByteString minKey = null;
226+
ByteString maxKey = null;
227+
228+
for (ByteString key : rowSet.getRowKeysList()) {
229+
if (minKey == null || ByteStringComparator.INSTANCE.compare(minKey, key) > 0) {
230+
minKey = key;
231+
}
232+
if (maxKey == null || ByteStringComparator.INSTANCE.compare(maxKey, key) < 0) {
233+
maxKey = key;
234+
}
235+
}
236+
237+
// Convert min & max keys in start & end points for a range
238+
StartPoint minStartPoint = null;
239+
EndPoint maxEndPoint = null;
240+
if (minKey != null) {
241+
minStartPoint = new StartPoint(minKey, true);
242+
}
243+
if (maxKey != null) {
244+
maxEndPoint = new EndPoint(maxKey, true);
245+
}
246+
247+
// Expand the range using the RowSet ranges
248+
for (RowRange rowRange : rowSet.getRowRangesList()) {
249+
StartPoint currentStartPoint = StartPoint.extract(rowRange);
250+
if (minStartPoint == null || minStartPoint.compareTo(currentStartPoint) > 0) {
251+
minStartPoint = currentStartPoint;
252+
}
253+
254+
EndPoint currentEndpoint = EndPoint.extract(rowRange);
255+
if (maxEndPoint == null || maxEndPoint.compareTo(currentEndpoint) < 0) {
256+
maxEndPoint = currentEndpoint;
257+
}
258+
}
259+
260+
// Build a range using the endpoints
261+
ByteStringRange boundingRange = ByteStringRange.unbounded();
262+
if (minStartPoint != null) {
263+
if (minStartPoint.isClosed) {
264+
boundingRange.startClosed(minStartPoint.value);
265+
} else {
266+
boundingRange.startOpen(minStartPoint.value);
267+
}
268+
}
269+
if (maxEndPoint != null) {
270+
if (maxEndPoint.isClosed) {
271+
boundingRange.endClosed(maxEndPoint.value);
272+
} else {
273+
boundingRange.endOpen(maxEndPoint.value);
274+
}
275+
}
276+
277+
return boundingRange;
278+
}
279+
280+
/**
281+
* Represents a RowSet split into 2 non-overlapping parts.
282+
*
283+
* <p>This class is considered an internal implementation detail and not meant to be used by
284+
* applications.
285+
*/
286+
@InternalApi
287+
@AutoValue
288+
public static abstract class Split {
289+
@Nullable
290+
public abstract RowSet getLeft();
291+
@Nullable
292+
public abstract RowSet getRight();
293+
294+
public static Split of(RowSet left, RowSet right) {
295+
return new AutoValue_RowSetUtil_Split(left, right);
296+
}
297+
}
298+
299+
private static final Comparator<RowRange> RANGE_START_COMPARATOR =
300+
new Comparator<RowRange>() {
301+
@Override
302+
public int compare(@Nonnull RowRange o1, @Nonnull RowRange o2) {
303+
return StartPoint.extract(o1).compareTo(StartPoint.extract(o2));
304+
}
305+
};
306+
307+
/** Helper class to ease comparison of RowRange start points. */
308+
private static final class StartPoint implements Comparable<StartPoint> {
309+
private final ByteString value;
310+
private final boolean isClosed;
311+
312+
@Nonnull
313+
static StartPoint extract(@Nonnull RowRange rowRange) {
314+
switch (rowRange.getStartKeyCase()) {
315+
case STARTKEY_NOT_SET:
316+
return new StartPoint(ByteString.EMPTY, true);
317+
case START_KEY_CLOSED:
318+
return new StartPoint(rowRange.getStartKeyClosed(), true);
319+
case START_KEY_OPEN:
320+
if (rowRange.getStartKeyOpen().isEmpty()) {
321+
// Take care to normalize an open empty start key to be closed.
322+
return new StartPoint(ByteString.EMPTY, true);
323+
} else {
324+
return new StartPoint(rowRange.getStartKeyOpen(), false);
325+
}
326+
default:
327+
throw new IllegalArgumentException("Unknown startKeyCase: " + rowRange.getStartKeyCase());
328+
}
329+
}
330+
331+
StartPoint(@Nonnull ByteString value, boolean isClosed) {
332+
this.value = value;
333+
this.isClosed = isClosed;
334+
}
335+
336+
@Override
337+
public int compareTo(@Nonnull StartPoint o) {
338+
return ComparisonChain.start()
339+
// Empty string comes first
340+
.compareTrueFirst(value.isEmpty(), o.value.isEmpty())
341+
.compare(value, o.value, ByteStringComparator.INSTANCE)
342+
// Closed start point comes before an open start point: [x,y] starts before (x,y].
343+
.compareTrueFirst(isClosed, o.isClosed)
344+
.result();
345+
}
346+
}
347+
348+
/** Helper class to ease comparison of RowRange endpoints. */
349+
private static final class EndPoint implements Comparable<EndPoint> {
350+
private final ByteString value;
351+
private final boolean isClosed;
352+
353+
@Nonnull
354+
static EndPoint extract(@Nonnull RowRange rowRange) {
355+
switch (rowRange.getEndKeyCase()) {
356+
case ENDKEY_NOT_SET:
357+
return new EndPoint(ByteString.EMPTY, true);
358+
case END_KEY_CLOSED:
359+
return new EndPoint(rowRange.getEndKeyClosed(), true);
360+
case END_KEY_OPEN:
361+
if (rowRange.getEndKeyOpen().isEmpty()) {
362+
// Take care to normalize an open empty end key to be closed.
363+
return new EndPoint(ByteString.EMPTY, true);
364+
} else {
365+
return new EndPoint(rowRange.getEndKeyOpen(), false);
366+
}
367+
default:
368+
throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase());
369+
}
370+
}
371+
372+
EndPoint(@Nonnull ByteString value, boolean isClosed) {
373+
this.value = value;
374+
this.isClosed = isClosed;
375+
}
376+
377+
@Override
378+
public int compareTo(@Nonnull EndPoint o) {
379+
return ComparisonChain.start()
380+
// Empty string comes last
381+
.compareFalseFirst(value.isEmpty(), o.value.isEmpty())
382+
.compare(value, o.value, ByteStringComparator.INSTANCE)
383+
// Open end point comes before a closed end point: [x,y) ends before [x,y].
384+
.compareFalseFirst(isClosed, o.isClosed)
385+
.result();
386+
}
387+
}
388+
}

0 commit comments

Comments
 (0)