Skip to content

Commit 5a7a290

Browse files
committed
fix: abstract the resource a batch is using
1 parent 8cbea70 commit 5a7a290

File tree

6 files changed

+191
-31
lines changed

6 files changed

+191
-31
lines changed

gax-java/gax/clirr-ignored-differences.xml

+6
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,10 @@
1313
<method>*setWaitTimeout*</method>
1414
<to>com.google.api.gax.rpc.ServerStreamingCallSettings$Builder</to>
1515
</difference>
16+
<!-- BatchingDescriptor is an InternalApi intended for google cloud libraries use only -->
17+
<difference>
18+
<differenceType>7012</differenceType>
19+
<className>com/google/api/gax/batching/BatchingDescriptor</className>
20+
<method>*</method>
21+
</difference>
1622
</differences>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.batching;
31+
32+
import com.google.api.core.InternalApi;
33+
34+
/**
35+
* Represent the resource used by a batch including element and byte. It can also be extended to
36+
* other things to determine if adding a new element needs to be flow controlled or if the current
37+
* batch needs to be flushed.
38+
*/
39+
@InternalApi("For google-cloud-java client use only.")
40+
public interface BatchResource {
41+
42+
/** Adds the additional resource. */
43+
BatchResource add(BatchResource resource);
44+
45+
/** Returns the element count of this resource. */
46+
long getElementCount();
47+
48+
/** Returns the byte count of this resource. */
49+
long getByteCount();
50+
51+
/** Returns true if the resource is empty. */
52+
boolean isEmpty();
53+
}

gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java

+36-28
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
9595
private final FlowController flowController;
9696
private final ApiCallContext callContext;
9797

98+
private final long elementThreshold;
99+
100+
private final long bytesThreshold;
101+
98102
/**
99103
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
100104
* into wrappers request and response
@@ -192,7 +196,7 @@ public BatcherImpl(
192196
+ "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");
193197
}
194198
this.flowController = flowController;
195-
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
199+
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats);
196200
if (batchingSettings.getDelayThreshold() != null) {
197201
long delay = batchingSettings.getDelayThreshold().toMillis();
198202
PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT> runnable =
@@ -204,6 +208,11 @@ public BatcherImpl(
204208
}
205209
currentBatcherReference = new BatcherReference(this);
206210
this.callContext = callContext;
211+
212+
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
213+
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
214+
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
215+
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
207216
}
208217

209218
/** {@inheritDoc} */
@@ -213,7 +222,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
213222
// will only be done from a single calling thread.
214223
Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher");
215224

216-
long bytesSize = batchingDescriptor.countBytes(element);
225+
BatchResource newResource = batchingDescriptor.createResource(element);
217226

218227
// This is not the optimal way of throttling. It does not send out partial batches, which
219228
// means that the Batcher might not use up all the resources allowed by FlowController.
@@ -232,7 +241,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
232241
// defer it till we decide on if refactoring FlowController is necessary.
233242
Stopwatch stopwatch = Stopwatch.createStarted();
234243
try {
235-
flowController.reserve(1, bytesSize);
244+
flowController.reserve(newResource.getElementCount(), newResource.getByteCount());
236245
} catch (FlowControlException e) {
237246
// This exception will only be thrown if the FlowController is set to ThrowException behavior
238247
throw FlowControlRuntimeException.fromFlowControlException(e);
@@ -241,12 +250,15 @@ public ApiFuture<ElementResultT> add(ElementT element) {
241250

242251
SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
243252
synchronized (elementLock) {
244-
currentOpenBatch.add(element, result, throttledTimeMs);
245-
}
253+
if (!currentOpenBatch.isEmpty()
254+
&& batchingDescriptor.shouldFlush(
255+
currentOpenBatch.resource.add(newResource), elementThreshold, bytesThreshold)) {
256+
sendOutstanding();
257+
}
246258

247-
if (currentOpenBatch.hasAnyThresholdReached()) {
248-
sendOutstanding();
259+
currentOpenBatch.add(element, newResource, result, throttledTimeMs);
249260
}
261+
250262
return result;
251263
}
252264

@@ -267,7 +279,7 @@ public void sendOutstanding() {
267279
return;
268280
}
269281
accumulatedBatch = currentOpenBatch;
270-
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
282+
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats);
271283
}
272284

273285
// This check is for old clients that instantiated the batcher without ApiCallContext
@@ -291,7 +303,9 @@ public void sendOutstanding() {
291303
@Override
292304
public void onSuccess(ResponseT response) {
293305
try {
294-
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
306+
flowController.release(
307+
accumulatedBatch.resource.getElementCount(),
308+
accumulatedBatch.resource.getByteCount());
295309
accumulatedBatch.onBatchSuccess(response);
296310
} finally {
297311
onBatchCompletion();
@@ -301,7 +315,9 @@ public void onSuccess(ResponseT response) {
301315
@Override
302316
public void onFailure(Throwable throwable) {
303317
try {
304-
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
318+
flowController.release(
319+
accumulatedBatch.resource.getElementCount(),
320+
accumulatedBatch.resource.getByteCount());
305321
accumulatedBatch.onBatchFailure(throwable);
306322
} finally {
307323
onBatchCompletion();
@@ -412,35 +428,31 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
412428
private final BatchingRequestBuilder<ElementT, RequestT> builder;
413429
private final List<BatchEntry<ElementT, ElementResultT>> entries;
414430
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
415-
private final BatcherStats batcherStats;
416-
private final long elementThreshold;
417-
private final long bytesThreshold;
418431

419-
private long elementCounter = 0;
420-
private long byteCounter = 0;
432+
private final BatcherStats batcherStats;
421433
private long totalThrottledTimeMs = 0;
434+
private BatchResource resource;
422435

423436
private Batch(
424437
RequestT prototype,
425438
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
426-
BatchingSettings batchingSettings,
427439
BatcherStats batcherStats) {
428440
this.descriptor = descriptor;
429441
this.builder = descriptor.newRequestBuilder(prototype);
430442
this.entries = new ArrayList<>();
431-
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
432-
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
433-
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
434-
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
435443
this.batcherStats = batcherStats;
444+
this.resource = descriptor.createEmptyResource();
436445
}
437446

438-
void add(ElementT element, SettableApiFuture<ElementResultT> result, long throttledTimeMs) {
447+
void add(
448+
ElementT element,
449+
BatchResource newResource,
450+
SettableApiFuture<ElementResultT> result,
451+
long throttledTimeMs) {
439452
builder.add(element);
440453
entries.add(BatchEntry.create(element, result));
441-
elementCounter++;
442-
byteCounter += descriptor.countBytes(element);
443454
totalThrottledTimeMs += throttledTimeMs;
455+
resource = resource.add(newResource);
444456
}
445457

446458
void onBatchSuccess(ResponseT response) {
@@ -464,11 +476,7 @@ void onBatchFailure(Throwable throwable) {
464476
}
465477

466478
boolean isEmpty() {
467-
return elementCounter == 0;
468-
}
469-
470-
boolean hasAnyThresholdReached() {
471-
return elementCounter >= elementThreshold || byteCounter >= bytesThreshold;
479+
return resource.isEmpty();
472480
}
473481
}
474482

gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java

+20
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,24 @@ public interface BatchingDescriptor<ElementT, ElementResultT, RequestT, Response
9696

9797
/** Returns the size of the passed element object in bytes. */
9898
long countBytes(ElementT element);
99+
100+
/** Creates a new {@link BatchResource} with ElementT. */
101+
default BatchResource createResource(ElementT element) {
102+
return new DefaultBatchResource(1, countBytes(element));
103+
}
104+
105+
/** Create an empty {@link BatchResource}. */
106+
default BatchResource createEmptyResource() {
107+
return new DefaultBatchResource(0, 0);
108+
}
109+
110+
/**
111+
* Checks if the current {@link BatchResource} should be flushed based on the maxElementThreshold
112+
* and maxBytesThreshold.
113+
*/
114+
default boolean shouldFlush(
115+
BatchResource resource, long maxElementThreshold, long maxBytesThreshold) {
116+
return resource.getElementCount() > maxElementThreshold
117+
|| resource.getByteCount() > maxBytesThreshold;
118+
}
99119
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.batching;
31+
32+
import com.google.common.base.Preconditions;
33+
34+
/**
35+
* The default implementation of {@link BatchResource} which tracks the elementCount and byteCount.
36+
*/
37+
final class DefaultBatchResource implements BatchResource {
38+
39+
private long elementCount;
40+
private long byteCount;
41+
42+
DefaultBatchResource(long elementCount, long byteCount) {
43+
this.elementCount = elementCount;
44+
this.byteCount = byteCount;
45+
}
46+
47+
@Override
48+
public BatchResource add(BatchResource resource) {
49+
Preconditions.checkArgument(
50+
resource instanceof DefaultBatchResource,
51+
"BatchResource needs to be an instance of DefaultBatchResource");
52+
this.elementCount += ((DefaultBatchResource) resource).elementCount;
53+
this.byteCount += ((DefaultBatchResource) resource).byteCount;
54+
return this;
55+
}
56+
57+
@Override
58+
public long getElementCount() {
59+
return elementCount;
60+
}
61+
62+
@Override
63+
public long getByteCount() {
64+
return byteCount;
65+
}
66+
67+
@Override
68+
public boolean isEmpty() {
69+
return elementCount == 0;
70+
}
71+
}

gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public class BatcherImplTest {
9393
BatchingSettings.newBuilder()
9494
.setElementCountThreshold(1000L)
9595
.setRequestByteThreshold(1000L)
96-
.setDelayThreshold(Duration.ofSeconds(1))
96+
.setDelayThreshold(Duration.ofSeconds(1000))
9797
.build();
9898

9999
@After
@@ -376,6 +376,7 @@ public void testWhenThresholdIsDisabled() throws Exception {
376376
.build();
377377
underTest = createDefaultBatcherImpl(settings, null);
378378
Future<Integer> result = underTest.add(2);
379+
underTest.add(3);
379380
assertThat(result.isDone()).isTrue();
380381
assertThat(result.get()).isEqualTo(4);
381382
}
@@ -895,7 +896,7 @@ public void run() {
895896

896897
// Mockito recommends using verify() as the ONLY way to interact with Argument
897898
// captors - otherwise it may incur in unexpected behaviour
898-
Mockito.verify(callContext).withOption(key.capture(), value.capture());
899+
Mockito.verify(callContext, Mockito.timeout(100)).withOption(key.capture(), value.capture());
899900

900901
// Verify that throttled time is recorded in ApiCallContext
901902
assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY);
@@ -1012,8 +1013,9 @@ private void testElementTriggers(BatchingSettings settings) throws Exception {
10121013
underTest = createDefaultBatcherImpl(settings, null);
10131014
Future<Integer> result = underTest.add(4);
10141015
assertThat(result.isDone()).isFalse();
1015-
// After this element is added, the batch triggers sendOutstanding().
10161016
Future<Integer> anotherResult = underTest.add(5);
1017+
// After this element is added, the batch triggers sendOutstanding().
1018+
underTest.add(6);
10171019
// Both the elements should be resolved now.
10181020
assertThat(result.isDone()).isTrue();
10191021
assertThat(result.get()).isEqualTo(16);

0 commit comments

Comments
 (0)