28
28
import com .google .pubsub .v1 .PubsubMessage ;
29
29
import com .google .pubsub .v1 .ReceivedMessage ;
30
30
import java .util .ArrayList ;
31
- import java .util .Collection ;
32
- import java .util .HashMap ;
31
+ import java .util .Collections ;
33
32
import java .util .HashSet ;
34
33
import java .util .Iterator ;
35
34
import java .util .List ;
36
- import java .util .Map ;
35
+ import java .util .PriorityQueue ;
37
36
import java .util .Set ;
38
37
import java .util .concurrent .ScheduledExecutorService ;
39
38
import java .util .concurrent .ScheduledFuture ;
@@ -68,8 +67,7 @@ class MessageDispatcher {
68
67
private final FlowController flowController ;
69
68
private final MessagesWaiter messagesWaiter ;
70
69
71
- // Map of outstanding messages (value) ordered by expiration time (key) in ascending order.
72
- private final Map <ExpirationInfo , List <AckHandler >> outstandingAckHandlers ;
70
+ private final PriorityQueue <ExtensionJob > outstandingAckHandlers ;
73
71
private final Set <String > pendingAcks ;
74
72
private final Set <String > pendingNacks ;
75
73
@@ -82,40 +80,43 @@ class MessageDispatcher {
82
80
// To keep track of number of seconds the receiver takes to process messages.
83
81
private final Distribution ackLatencyDistribution ;
84
82
85
- private static class ExpirationInfo implements Comparable <ExpirationInfo > {
86
- private final Clock clock ;
83
+ // ExtensionJob represents a group of {@code AckHandler}s that shares the same expiration.
84
+ //
85
+ // It is Comparable so that it may be put in a PriorityQueue.
86
+ // For efficiency, it is also mutable, so great care should be taken to make sure
87
+ // it is not modified while inside the queue.
88
+ // The hashcode and equals methods are explicitly not implemented to discourage
89
+ // the use of this class as keys in maps or similar containers.
90
+ private static class ExtensionJob implements Comparable <ExtensionJob > {
87
91
Instant expiration ;
88
92
int nextExtensionSeconds ;
93
+ ArrayList <AckHandler > ackHandlers ;
89
94
90
- ExpirationInfo ( Clock clock , Instant expiration , int initialAckDeadlineExtension ) {
91
- this . clock = clock ;
95
+ ExtensionJob (
96
+ Instant expiration , int initialAckDeadlineExtension , ArrayList < AckHandler > ackHandlers ) {
92
97
this .expiration = expiration ;
93
98
nextExtensionSeconds = initialAckDeadlineExtension ;
99
+ this .ackHandlers = ackHandlers ;
94
100
}
95
101
96
- void extendExpiration () {
97
- expiration = new Instant ( clock . millis ()) .plus (Duration .standardSeconds (nextExtensionSeconds ));
102
+ void extendExpiration (Instant now ) {
103
+ expiration = now .plus (Duration .standardSeconds (nextExtensionSeconds ));
98
104
nextExtensionSeconds = Math .min (2 * nextExtensionSeconds , MAX_ACK_DEADLINE_EXTENSION_SECS );
99
105
}
100
106
101
107
@ Override
102
- public int hashCode ( ) {
103
- return expiration .hashCode ( );
108
+ public int compareTo ( ExtensionJob other ) {
109
+ return expiration .compareTo ( other . expiration );
104
110
}
105
111
106
- @ Override
107
- public boolean equals ( Object obj ) {
108
- if (!( obj instanceof ExpirationInfo ) ) {
109
- return false ;
112
+ public String toString () {
113
+ ArrayList < String > ackIds = new ArrayList <>();
114
+ for ( AckHandler ah : ackHandlers ) {
115
+ ackIds . add ( ah . ackId ) ;
110
116
}
111
-
112
- ExpirationInfo other = (ExpirationInfo ) obj ;
113
- return expiration .equals (other .expiration );
114
- }
115
-
116
- @ Override
117
- public int compareTo (ExpirationInfo other ) {
118
- return expiration .compareTo (other .expiration );
117
+ return String .format (
118
+ "ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}" ,
119
+ expiration , nextExtensionSeconds , ackIds );
119
120
}
120
121
}
121
122
@@ -137,6 +138,12 @@ static class PendingModifyAckDeadline {
137
138
public void addAckId (String ackId ) {
138
139
ackIds .add (ackId );
139
140
}
141
+
142
+ public String toString () {
143
+ return String .format (
144
+ "PendingModifyAckDeadline{extension: %d sec, ackIds: %s}" ,
145
+ deadlineExtensionSeconds , ackIds );
146
+ }
140
147
}
141
148
142
149
/**
@@ -217,7 +224,7 @@ void sendAckOperations(
217
224
this .receiver = receiver ;
218
225
this .ackProcessor = ackProcessor ;
219
226
this .flowController = flowController ;
220
- outstandingAckHandlers = new HashMap <>();
227
+ outstandingAckHandlers = new PriorityQueue <>();
221
228
pendingAcks = new HashSet <>();
222
229
pendingNacks = new HashSet <>();
223
230
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
@@ -257,20 +264,14 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
257
264
}
258
265
Instant now = new Instant (clock .millis ());
259
266
int totalByteCount = 0 ;
260
- final List <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
267
+ final ArrayList <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
261
268
for (ReceivedMessage pubsubMessage : responseMessages ) {
262
269
int messageSize = pubsubMessage .getMessage ().getSerializedSize ();
263
270
totalByteCount += messageSize ;
264
271
ackHandlers .add (new AckHandler (pubsubMessage .getAckId (), messageSize ));
265
272
}
266
- ExpirationInfo expiration =
267
- new ExpirationInfo (
268
- clock , now .plus (messageDeadlineSeconds * 1000 ), INITIAL_ACK_DEADLINE_EXTENSION_SECONDS );
269
- synchronized (outstandingAckHandlers ) {
270
- addOutstadingAckHandlers (expiration , ackHandlers );
271
- }
273
+ Instant expiration = now .plus (messageDeadlineSeconds * 1000 );
272
274
logger .debug ("Received {} messages at {}" , responseMessages .size (), now );
273
- setupNextAckDeadlineExtensionAlarm (expiration );
274
275
275
276
messagesWaiter .incrementPendingMessages (responseMessages .size ());
276
277
Iterator <AckHandler > acksIterator = ackHandlers .iterator ();
@@ -285,21 +286,20 @@ public void run() {
285
286
}
286
287
});
287
288
}
289
+
290
+ synchronized (outstandingAckHandlers ) {
291
+ outstandingAckHandlers .add (
292
+ new ExtensionJob (expiration , INITIAL_ACK_DEADLINE_EXTENSION_SECONDS , ackHandlers ));
293
+ }
294
+ setupNextAckDeadlineExtensionAlarm (expiration );
295
+
288
296
try {
289
297
flowController .reserve (receivedMessagesCount , totalByteCount );
290
298
} catch (FlowController .FlowControlException unexpectedException ) {
291
299
throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
292
300
}
293
301
}
294
302
295
- private void addOutstadingAckHandlers (
296
- ExpirationInfo expiration , final List <AckHandler > ackHandlers ) {
297
- if (!outstandingAckHandlers .containsKey (expiration )) {
298
- outstandingAckHandlers .put (expiration , new ArrayList <AckHandler >(ackHandlers .size ()));
299
- }
300
- outstandingAckHandlers .get (expiration ).addAll (ackHandlers );
301
- }
302
-
303
303
private void setupPendingAcksAlarm () {
304
304
alarmsLock .lock ();
305
305
try {
@@ -354,41 +354,49 @@ public void run() {
354
354
now ,
355
355
cutOverTime ,
356
356
ackExpirationPadding );
357
- ExpirationInfo nextScheduleExpiration = null ;
357
+ Instant nextScheduleExpiration = null ;
358
358
List <PendingModifyAckDeadline > modifyAckDeadlinesToSend = new ArrayList <>();
359
359
360
+ // Holding area for jobs we'll put back into the queue
361
+ // so we don't process the same job twice.
362
+ List <ExtensionJob > renewJobs = new ArrayList <>();
363
+
360
364
synchronized (outstandingAckHandlers ) {
361
- for (ExpirationInfo messageExpiration : outstandingAckHandlers .keySet ()) {
362
- if (messageExpiration .expiration .compareTo (cutOverTime ) <= 0 ) {
363
- Collection <AckHandler > expiringAcks = outstandingAckHandlers .get (messageExpiration );
364
- outstandingAckHandlers .remove (messageExpiration );
365
- List <AckHandler > renewedAckHandlers = new ArrayList <>(expiringAcks .size ());
366
- messageExpiration .extendExpiration ();
367
- int extensionSeconds =
368
- Ints .saturatedCast (
369
- new Interval (now , messageExpiration .expiration )
370
- .toDuration ()
371
- .getStandardSeconds ());
372
- PendingModifyAckDeadline pendingModAckDeadline =
373
- new PendingModifyAckDeadline (extensionSeconds );
374
- for (AckHandler ackHandler : expiringAcks ) {
375
- if (ackHandler .acked .get ()) {
376
- continue ;
377
- }
378
- pendingModAckDeadline .addAckId (ackHandler .ackId );
379
- renewedAckHandlers .add (ackHandler );
380
- }
381
- modifyAckDeadlinesToSend .add (pendingModAckDeadline );
382
- if (!renewedAckHandlers .isEmpty ()) {
383
- addOutstadingAckHandlers (messageExpiration , renewedAckHandlers );
365
+ while (!outstandingAckHandlers .isEmpty ()
366
+ && outstandingAckHandlers .peek ().expiration .compareTo (cutOverTime ) <= 0 ) {
367
+ ExtensionJob job = outstandingAckHandlers .poll ();
368
+
369
+ // If a message has already been acked, remove it, nothing to do.
370
+ for (int i = 0 ; i < job .ackHandlers .size (); ) {
371
+ if (job .ackHandlers .get (i ).acked .get ()) {
372
+ Collections .swap (job .ackHandlers , i , job .ackHandlers .size () - 1 );
373
+ job .ackHandlers .remove (job .ackHandlers .size () - 1 );
384
374
} else {
385
- outstandingAckHandlers . remove ( messageExpiration ) ;
375
+ i ++ ;
386
376
}
387
377
}
388
- if ( nextScheduleExpiration == null
389
- || nextScheduleExpiration . expiration . isAfter ( messageExpiration . expiration )) {
390
- nextScheduleExpiration = messageExpiration ;
378
+
379
+ if ( job . ackHandlers . isEmpty ( )) {
380
+ continue ;
391
381
}
382
+
383
+ job .extendExpiration (now );
384
+ int extensionSeconds =
385
+ Ints .saturatedCast (
386
+ new Interval (now , job .expiration ).toDuration ().getStandardSeconds ());
387
+ PendingModifyAckDeadline pendingModAckDeadline =
388
+ new PendingModifyAckDeadline (extensionSeconds );
389
+ for (AckHandler ackHandler : job .ackHandlers ) {
390
+ pendingModAckDeadline .addAckId (ackHandler .ackId );
391
+ }
392
+ modifyAckDeadlinesToSend .add (pendingModAckDeadline );
393
+ renewJobs .add (job );
394
+ }
395
+ for (ExtensionJob job : renewJobs ) {
396
+ outstandingAckHandlers .add (job );
397
+ }
398
+ if (!outstandingAckHandlers .isEmpty ()) {
399
+ nextScheduleExpiration = outstandingAckHandlers .peek ().expiration ;
392
400
}
393
401
}
394
402
@@ -404,8 +412,8 @@ public void run() {
404
412
}
405
413
}
406
414
407
- private void setupNextAckDeadlineExtensionAlarm (ExpirationInfo messageExpiration ) {
408
- Instant possibleNextAlarmTime = messageExpiration . expiration .minus (ackExpirationPadding );
415
+ private void setupNextAckDeadlineExtensionAlarm (Instant expiration ) {
416
+ Instant possibleNextAlarmTime = expiration .minus (ackExpirationPadding );
409
417
alarmsLock .lock ();
410
418
try {
411
419
if (nextAckDeadlineExtensionAlarmTime .isAfter (possibleNextAlarmTime )) {
0 commit comments