25
25
import com .google .api .core .BetaApi ;
26
26
import com .google .api .core .SettableApiFuture ;
27
27
import com .google .api .gax .batching .BatchingSettings ;
28
+ import com .google .api .gax .core .BackgroundResource ;
29
+ import com .google .api .gax .core .BackgroundResourceAggregation ;
28
30
import com .google .api .gax .core .CredentialsProvider ;
29
31
import com .google .api .gax .core .ExecutorAsBackgroundResource ;
30
32
import com .google .api .gax .core .ExecutorProvider ;
40
42
import com .google .cloud .pubsub .v1 .stub .PublisherStub ;
41
43
import com .google .cloud .pubsub .v1 .stub .PublisherStubSettings ;
42
44
import com .google .common .base .Preconditions ;
45
+ import com .google .common .collect .ImmutableList ;
43
46
import com .google .pubsub .v1 .PublishRequest ;
44
47
import com .google .pubsub .v1 .PublishResponse ;
45
48
import com .google .pubsub .v1 .PubsubMessage ;
@@ -86,15 +89,16 @@ public class Publisher {
86
89
private final BatchingSettings batchingSettings ;
87
90
88
91
private final Lock messagesBatchLock ;
89
- private MessagesBatch messagesBatch ;
92
+ private List <OutstandingPublish > messagesBatch ;
93
+ private int batchedBytes ;
90
94
91
95
private final AtomicBoolean activeAlarm ;
92
96
93
97
private final PublisherStub publisherStub ;
94
98
95
99
private final ScheduledExecutorService executor ;
96
100
private final AtomicBoolean shutdown ;
97
- private final List < AutoCloseable > closeables ;
101
+ private final BackgroundResource backgroundResources ;
98
102
private final MessageWaiter messagesWaiter ;
99
103
private ScheduledFuture <?> currentAlarmFuture ;
100
104
private final ApiFunction <PubsubMessage , PubsubMessage > messageTransform ;
@@ -115,15 +119,14 @@ private Publisher(Builder builder) throws IOException {
115
119
this .batchingSettings = builder .batchingSettings ;
116
120
this .messageTransform = builder .messageTransform ;
117
121
118
- messagesBatch = new MessagesBatch ();
122
+ messagesBatch = new LinkedList <> ();
119
123
messagesBatchLock = new ReentrantLock ();
120
124
activeAlarm = new AtomicBoolean (false );
121
125
executor = builder .executorProvider .getExecutor ();
126
+
127
+ List <BackgroundResource > backgroundResourceList = new ArrayList <>();
122
128
if (builder .executorProvider .shouldAutoClose ()) {
123
- closeables =
124
- Collections .<AutoCloseable >singletonList (new ExecutorAsBackgroundResource (executor ));
125
- } else {
126
- closeables = Collections .emptyList ();
129
+ backgroundResourceList .add (new ExecutorAsBackgroundResource (executor ));
127
130
}
128
131
129
132
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
@@ -151,7 +154,8 @@ private Publisher(Builder builder) throws IOException {
151
154
.setRetrySettings (retrySettings )
152
155
.setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
153
156
this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
154
-
157
+ backgroundResourceList .add (publisherStub );
158
+ backgroundResources = new BackgroundResourceAggregation (backgroundResourceList );
155
159
shutdown = new AtomicBoolean (false );
156
160
messagesWaiter = new MessageWaiter ();
157
161
}
@@ -197,75 +201,96 @@ public ApiFuture<String> publish(PubsubMessage message) {
197
201
}
198
202
199
203
message = messageTransform .apply (message );
200
- List <OutstandingBatch > batchesToSend = new ArrayList <>();
201
- final OutstandingPublish outstandingPublish = new OutstandingPublish (message );
204
+ final int messageSize = message .getSerializedSize ();
205
+ OutstandingBatch batchToSend = null ;
206
+ SettableApiFuture <String > publishResult = SettableApiFuture .<String >create ();
207
+ final OutstandingPublish outstandingPublish = new OutstandingPublish (publishResult , message );
202
208
messagesBatchLock .lock ();
203
209
try {
204
210
// Check if the next message makes the current batch exceed the max batch byte size.
205
211
if (!messagesBatch .isEmpty ()
206
212
&& hasBatchingBytes ()
207
- && messagesBatch .getBatchedBytes () + outstandingPublish .messageSize
208
- >= getMaxBatchBytes ()) {
209
- batchesToSend .add (messagesBatch .popOutstandingBatch ());
213
+ && batchedBytes + messageSize >= getMaxBatchBytes ()) {
214
+ batchToSend = new OutstandingBatch (messagesBatch , batchedBytes );
215
+ messagesBatch = new LinkedList <>();
216
+ batchedBytes = 0 ;
210
217
}
211
218
212
- messagesBatch .addMessage (outstandingPublish , outstandingPublish .messageSize );
213
-
214
- // Border case: If the message to send is greater or equals to the max batch size then send it
215
- // immediately.
216
- // Alternatively if after adding the message we have reached the batch max messages then we
217
- // have a batch to send.
218
- if ((hasBatchingBytes () && outstandingPublish .messageSize >= getMaxBatchBytes ())
219
- || messagesBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
220
- batchesToSend .add (messagesBatch .popOutstandingBatch ());
219
+ // Border case if the message to send is greater or equals to the max batch size then can't
220
+ // be included in the current batch and instead sent immediately.
221
+ if (!hasBatchingBytes () || messageSize < getMaxBatchBytes ()) {
222
+ batchedBytes += messageSize ;
223
+ messagesBatch .add (outstandingPublish );
224
+
225
+ // If after adding the message we have reached the batch max messages then we have a batch
226
+ // to send.
227
+ if (messagesBatch .size () == getBatchingSettings ().getElementCountThreshold ()) {
228
+ batchToSend = new OutstandingBatch (messagesBatch , batchedBytes );
229
+ messagesBatch = new LinkedList <>();
230
+ batchedBytes = 0 ;
231
+ }
221
232
}
222
233
// Setup the next duration based delivery alarm if there are messages batched.
223
- setupAlarm ();
234
+ if (!messagesBatch .isEmpty ()) {
235
+ setupDurationBasedPublishAlarm ();
236
+ } else if (currentAlarmFuture != null ) {
237
+ logger .log (Level .FINER , "Cancelling alarm, no more messages" );
238
+ if (activeAlarm .getAndSet (false )) {
239
+ currentAlarmFuture .cancel (false );
240
+ }
241
+ }
224
242
} finally {
225
243
messagesBatchLock .unlock ();
226
244
}
227
245
228
246
messagesWaiter .incrementPendingMessages (1 );
229
247
230
- if (!batchesToSend .isEmpty ()) {
231
- for (final OutstandingBatch batch : batchesToSend ) {
232
- logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
233
- executor .execute (
234
- new Runnable () {
235
- @ Override
236
- public void run () {
237
- publishOutstandingBatch (batch );
238
- }
239
- });
240
- }
248
+ if (batchToSend != null ) {
249
+ logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
250
+ final OutstandingBatch finalBatchToSend = batchToSend ;
251
+ executor .execute (
252
+ new Runnable () {
253
+ @ Override
254
+ public void run () {
255
+ publishOutstandingBatch (finalBatchToSend );
256
+ }
257
+ });
258
+ }
259
+
260
+ // If the message is over the size limit, it was not added to the pending messages and it will
261
+ // be sent in its own batch immediately.
262
+ if (hasBatchingBytes () && messageSize >= getMaxBatchBytes ()) {
263
+ logger .log (
264
+ Level .FINER , "Message exceeds the max batch bytes, scheduling it for immediate send." );
265
+ executor .execute (
266
+ new Runnable () {
267
+ @ Override
268
+ public void run () {
269
+ publishOutstandingBatch (
270
+ new OutstandingBatch (ImmutableList .of (outstandingPublish ), messageSize ));
271
+ }
272
+ });
241
273
}
242
274
243
- return outstandingPublish . publishResult ;
275
+ return publishResult ;
244
276
}
245
277
246
- private void setupAlarm () {
247
- if (!messagesBatch .isEmpty ()) {
248
- if (!activeAlarm .getAndSet (true )) {
249
- long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
250
- logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
251
- currentAlarmFuture =
252
- executor .schedule (
253
- new Runnable () {
254
- @ Override
255
- public void run () {
256
- logger .log (Level .FINER , "Sending messages based on schedule." );
257
- activeAlarm .getAndSet (false );
258
- publishAllOutstanding ();
259
- }
260
- },
261
- delayThresholdMs ,
262
- TimeUnit .MILLISECONDS );
263
- }
264
- } else if (currentAlarmFuture != null ) {
265
- logger .log (Level .FINER , "Cancelling alarm, no more messages" );
266
- if (activeAlarm .getAndSet (false )) {
267
- currentAlarmFuture .cancel (false );
268
- }
278
+ private void setupDurationBasedPublishAlarm () {
279
+ if (!activeAlarm .getAndSet (true )) {
280
+ long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
281
+ logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
282
+ currentAlarmFuture =
283
+ executor .schedule (
284
+ new Runnable () {
285
+ @ Override
286
+ public void run () {
287
+ logger .log (Level .FINER , "Sending messages based on schedule." );
288
+ activeAlarm .getAndSet (false );
289
+ publishAllOutstanding ();
290
+ }
291
+ },
292
+ delayThresholdMs ,
293
+ TimeUnit .MILLISECONDS );
269
294
}
270
295
}
271
296
@@ -281,25 +306,24 @@ public void publishAllOutstanding() {
281
306
if (messagesBatch .isEmpty ()) {
282
307
return ;
283
308
}
284
- batchToSend = messagesBatch .popOutstandingBatch ();
309
+ batchToSend = new OutstandingBatch (messagesBatch , batchedBytes );
310
+ messagesBatch = new LinkedList <>();
311
+ batchedBytes = 0 ;
285
312
} finally {
286
313
messagesBatchLock .unlock ();
287
314
}
288
315
publishOutstandingBatch (batchToSend );
289
316
}
290
317
291
- private ApiFuture < PublishResponse > publishCall ( OutstandingBatch outstandingBatch ) {
318
+ private void publishOutstandingBatch ( final OutstandingBatch outstandingBatch ) {
292
319
PublishRequest .Builder publishRequest = PublishRequest .newBuilder ();
293
320
publishRequest .setTopic (topicName );
294
321
for (OutstandingPublish outstandingPublish : outstandingBatch .outstandingPublishes ) {
295
322
publishRequest .addMessages (outstandingPublish .message );
296
323
}
297
324
298
- return publisherStub .publishCallable ().futureCall (publishRequest .build ());
299
- }
300
-
301
- private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
302
- ApiFutureCallback <PublishResponse > futureCallback =
325
+ ApiFutures .addCallback (
326
+ publisherStub .publishCallable ().futureCall (publishRequest .build ()),
303
327
new ApiFutureCallback <PublishResponse >() {
304
328
@ Override
305
329
public void onSuccess (PublishResponse result ) {
@@ -338,9 +362,7 @@ public void onFailure(Throwable t) {
338
362
messagesWaiter .incrementPendingMessages (-outstandingBatch .size ());
339
363
}
340
364
}
341
- };
342
-
343
- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
365
+ });
344
366
}
345
367
346
368
private static final class OutstandingBatch {
@@ -356,18 +378,21 @@ private static final class OutstandingBatch {
356
378
this .batchSizeBytes = batchSizeBytes ;
357
379
}
358
380
359
- int size () {
381
+ public int getAttempt () {
382
+ return attempt ;
383
+ }
384
+
385
+ public int size () {
360
386
return outstandingPublishes .size ();
361
387
}
362
388
}
363
389
364
390
private static final class OutstandingPublish {
365
- final SettableApiFuture <String > publishResult ;
366
- final PubsubMessage message ;
367
- final int messageSize ;
391
+ SettableApiFuture <String > publishResult ;
392
+ PubsubMessage message ;
368
393
369
- OutstandingPublish (PubsubMessage message ) {
370
- this .publishResult = SettableApiFuture . create () ;
394
+ OutstandingPublish (SettableApiFuture < String > publishResult , PubsubMessage message ) {
395
+ this .publishResult = publishResult ;
371
396
this .message = message ;
372
397
this .messageSize = message .getSerializedSize ();
373
398
}
@@ -397,10 +422,7 @@ public void shutdown() throws Exception {
397
422
currentAlarmFuture .cancel (false );
398
423
}
399
424
publishAllOutstanding ();
400
- for (AutoCloseable closeable : closeables ) {
401
- closeable .close ();
402
- }
403
- publisherStub .shutdown ();
425
+ backgroundResources .shutdown ();
404
426
}
405
427
406
428
/**
@@ -410,37 +432,7 @@ public void shutdown() throws Exception {
410
432
* <p>Call this method to make sure all resources are freed properly.
411
433
*/
412
434
public boolean awaitTermination (long duration , TimeUnit unit ) throws InterruptedException {
413
- final long startDuration = System .currentTimeMillis ();
414
- final long totalDurationMs = TimeUnit .MILLISECONDS .convert (duration , unit );
415
- messagesWaiter .waitNoMessages ();
416
- long remainingDuration = getRemainingDuration (startDuration , totalDurationMs );
417
- boolean isAwaited =
418
- remainingDuration < totalDurationMs
419
- ? publisherStub .awaitTermination (remainingDuration , TimeUnit .MILLISECONDS )
420
- : false ;
421
- if (isAwaited ) {
422
- for (AutoCloseable closeable : closeables ) {
423
- ExecutorAsBackgroundResource executorAsBackgroundResource =
424
- (ExecutorAsBackgroundResource ) closeable ;
425
- remainingDuration = getRemainingDuration (startDuration , totalDurationMs );
426
- System .out .println (remainingDuration );
427
- isAwaited =
428
- remainingDuration < totalDurationMs
429
- ? executorAsBackgroundResource .awaitTermination (
430
- getRemainingDuration (startDuration , totalDurationMs ), TimeUnit .MILLISECONDS )
431
- : false ;
432
- if (!isAwaited ) {
433
- return false ;
434
- }
435
- }
436
- } else {
437
- return false ;
438
- }
439
- return true ;
440
- }
441
-
442
- private long getRemainingDuration (long startDuration , long totalDurationMs ) {
443
- return totalDurationMs - (System .currentTimeMillis () - startDuration );
435
+ return backgroundResources .awaitTermination (duration , unit );
444
436
}
445
437
446
438
private boolean hasBatchingBytes () {
0 commit comments