Skip to content

Commit a9860b3

Browse files
igorbernstein2pongad
authored andcommitted
Bigtable: improve list tables spooler (googleapis#3639)
* Bigtable: improve list tables spooler Avoid blocking the event loop. Previously the first page would be fetched asynchronously, but all of the other pages would be fetched synchronously which would block the grpc event loop. The new implementation uses future chaining. * update async test as well * reformat
1 parent 01bec2b commit a9860b3

File tree

2 files changed

+93
-29
lines changed

2 files changed

+93
-29
lines changed

google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.bigtable.admin.v2;
1717

18+
import com.google.api.core.ApiAsyncFunction;
1819
import com.google.api.core.ApiFunction;
1920
import com.google.api.core.ApiFuture;
2021
import com.google.api.core.ApiFutures;
@@ -27,6 +28,7 @@
2728
import com.google.bigtable.admin.v2.InstanceName;
2829
import com.google.bigtable.admin.v2.ListTablesRequest;
2930
import com.google.bigtable.admin.v2.TableName;
31+
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage;
3032
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse;
3133
import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken;
3234
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
@@ -433,22 +435,65 @@ public ApiFuture<List<TableName>> listTablesAsync() {
433435
ListTablesRequest request = ListTablesRequest.newBuilder().setParent(instanceName.toString())
434436
.build();
435437

436-
ApiFuture<ListTablesPagedResponse> listResp =
437-
this.stub.listTablesPagedCallable().futureCall(request);
438+
// TODO(igorbernstein2): try to upstream pagination spooling or figure out a way to expose the
439+
// paginated responses while maintaining the wrapper facade.
438440

439-
return ApiFutures.transform(
440-
listResp,
441-
new ApiFunction<ListTablesPagedResponse, List<TableName>>() {
441+
// Fetch the first page.
442+
ApiFuture<ListTablesPage> firstPageFuture = ApiFutures.transform(
443+
stub.listTablesPagedCallable().futureCall(request),
444+
new ApiFunction<ListTablesPagedResponse, ListTablesPage>() {
442445
@Override
443-
public List<TableName> apply(ListTablesPagedResponse response) {
444-
List<TableName> results = Lists.newArrayList();
445-
for (com.google.bigtable.admin.v2.Table proto : response.iterateAll()) {
446+
public ListTablesPage apply(ListTablesPagedResponse response) {
447+
return response.getPage();
448+
}
449+
},
450+
MoreExecutors.directExecutor()
451+
);
452+
453+
// Fetch the rest of the pages by chaining the futures.
454+
ApiFuture<List<com.google.bigtable.admin.v2.Table>> allProtos = ApiFutures
455+
.transformAsync(
456+
firstPageFuture,
457+
new ApiAsyncFunction<ListTablesPage, List<com.google.bigtable.admin.v2.Table>>() {
458+
List<com.google.bigtable.admin.v2.Table> responseAccumulator = Lists
459+
.newArrayList();
460+
461+
@Override
462+
public ApiFuture<List<com.google.bigtable.admin.v2.Table>> apply(
463+
ListTablesPage page) {
464+
// Add all entries from the page
465+
responseAccumulator.addAll(Lists.newArrayList(page.getValues()));
466+
467+
// If this is the last page, just return the accumulated responses.
468+
if (!page.hasNextPage()) {
469+
return ApiFutures.immediateFuture(responseAccumulator);
470+
}
471+
472+
// Otherwise fetch the next page.
473+
return ApiFutures.transformAsync(
474+
page.getNextPageAsync(),
475+
this,
476+
MoreExecutors.directExecutor()
477+
);
478+
}
479+
},
480+
MoreExecutors.directExecutor()
481+
);
482+
483+
// Wrap all of the accumulated protos.
484+
return ApiFutures.transform(allProtos,
485+
new ApiFunction<List<com.google.bigtable.admin.v2.Table>, List<TableName>>() {
486+
@Override
487+
public List<TableName> apply(List<com.google.bigtable.admin.v2.Table> protos) {
488+
List<TableName> results = Lists.newArrayListWithCapacity(protos.size());
489+
for (com.google.bigtable.admin.v2.Table proto : protos) {
446490
results.add(TableName.parse(proto.getName()));
447491
}
448492
return results;
449493
}
450494
},
451-
MoreExecutors.directExecutor());
495+
MoreExecutors.directExecutor()
496+
);
452497
}
453498

454499
/**

google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.bigtable.admin.v2.ListTablesRequest;
3434
import com.google.bigtable.admin.v2.ModifyColumnFamiliesRequest.Modification;
3535
import com.google.bigtable.admin.v2.TableName;
36+
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage;
3637
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse;
3738
import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken;
3839
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
@@ -216,34 +217,52 @@ public void testGetTable() {
216217
@Test
217218
public void testListTables() {
218219
// Setup
219-
ListTablesRequest expectedRequest = ListTablesRequest.newBuilder()
220-
.setParent(INSTANCE_NAME.toString())
221-
.build();
222-
223-
ListTablesPagedResponse expectedResponseWrapper = Mockito.mock(ListTablesPagedResponse.class);
220+
com.google.bigtable.admin.v2.ListTablesRequest expectedRequest =
221+
com.google.bigtable.admin.v2.ListTablesRequest.newBuilder()
222+
.setParent(INSTANCE_NAME.toString())
223+
.build();
224224

225-
Iterable<com.google.bigtable.admin.v2.Table> expectedResults = Lists.newArrayList(
226-
com.google.bigtable.admin.v2.Table.newBuilder()
227-
.setName(TABLE_NAME.toString() + "1")
228-
.build(),
229-
com.google.bigtable.admin.v2.Table.newBuilder()
230-
.setName(TABLE_NAME.toString() + "2")
231-
.build());
225+
// 3 Tables spread across 2 pages
226+
List<com.google.bigtable.admin.v2.Table> expectedProtos = Lists.newArrayList();
227+
for (int i = 0; i < 3; i++) {
228+
expectedProtos.add(
229+
com.google.bigtable.admin.v2.Table.newBuilder()
230+
.setName(TABLE_NAME.toString() + i)
231+
.build());
232+
}
233+
// 2 on the first page
234+
ListTablesPage page0 = Mockito.mock(ListTablesPage.class);
235+
Mockito.when(page0.getValues()).thenReturn(expectedProtos.subList(0, 2));
236+
Mockito.when(page0.getNextPageToken()).thenReturn("next-page");
237+
Mockito.when(page0.hasNextPage()).thenReturn(true);
238+
239+
// 1 on the last page
240+
ListTablesPage page1 = Mockito.mock(ListTablesPage.class);
241+
Mockito.when(page1.getValues()).thenReturn(expectedProtos.subList(2, 3));
242+
243+
// Link page0 to page1
244+
Mockito.when(page0.getNextPageAsync()).thenReturn(
245+
ApiFutures.immediateFuture(page1)
246+
);
232247

233-
Mockito.when(mockListTableCallable.futureCall(expectedRequest))
234-
.thenReturn(ApiFutures.immediateFuture(expectedResponseWrapper));
248+
// Link page to the response
249+
ListTablesPagedResponse response0 = Mockito.mock(ListTablesPagedResponse.class);
250+
Mockito.when(response0.getPage()).thenReturn(page0);
235251

236-
Mockito.when(expectedResponseWrapper.iterateAll())
237-
.thenReturn(expectedResults);
252+
Mockito.when(mockListTableCallable.futureCall(expectedRequest)).thenReturn(
253+
ApiFutures.immediateFuture(response0)
254+
);
238255

239256
// Execute
240257
List<TableName> actualResults = adminClient.listTables();
241258

242259
// Verify
243-
assertThat(actualResults).containsExactly(
244-
TableName.parse(TABLE_NAME.toString() + "1"),
245-
TableName.parse(TABLE_NAME.toString() + "2")
246-
);
260+
List<TableName> expectedResults = Lists.newArrayList();
261+
for (com.google.bigtable.admin.v2.Table expectedProto : expectedProtos) {
262+
expectedResults.add(TableName.parse(expectedProto.getName()));
263+
}
264+
265+
assertThat(actualResults).containsExactlyElementsIn(expectedResults);
247266
}
248267

249268
@Test

0 commit comments

Comments
 (0)