24
24
import java .util .function .Predicate ;
25
25
26
26
import static io .nats .client .support .NatsConstants .EMPTY_BODY ;
27
+ import static io .nats .client .support .NatsConstants .OUTPUT_QUEUE_IS_FULL ;
27
28
28
29
class MessageQueue {
29
30
protected static final int STOPPED = 0 ;
@@ -35,9 +36,10 @@ class MessageQueue {
35
36
protected final AtomicInteger running ;
36
37
protected final boolean singleReaderMode ;
37
38
protected final LinkedBlockingQueue <NatsMessage > queue ;
38
- protected final Lock filterLock ;
39
+ protected final Lock editLock ;
39
40
protected final int publishHighwaterMark ;
40
41
protected final boolean discardWhenFull ;
42
+ protected final long offerLockMillis ;
41
43
protected final long offerTimeoutMillis ;
42
44
protected final Duration requestCleanupInterval ;
43
45
@@ -47,7 +49,11 @@ class MessageQueue {
47
49
protected final NatsMessage poisonPill ;
48
50
49
51
MessageQueue (boolean singleReaderMode , Duration requestCleanupInterval ) {
50
- this (singleReaderMode , -1 , false , requestCleanupInterval );
52
+ this (singleReaderMode , -1 , false , requestCleanupInterval , null );
53
+ }
54
+
55
+ MessageQueue (boolean singleReaderMode , Duration requestCleanupInterval , MessageQueue source ) {
56
+ this (singleReaderMode , -1 , false , requestCleanupInterval , source );
51
57
}
52
58
53
59
/**
@@ -61,31 +67,40 @@ class MessageQueue {
61
67
* @param requestCleanupInterval is used to figure the offerTimeoutMillis
62
68
*/
63
69
MessageQueue (boolean singleReaderMode , int publishHighwaterMark , boolean discardWhenFull , Duration requestCleanupInterval ) {
70
+ this (singleReaderMode , publishHighwaterMark , discardWhenFull , requestCleanupInterval , null );
71
+ }
72
+
73
+ MessageQueue (boolean singleReaderMode , int publishHighwaterMark , boolean discardWhenFull , Duration requestCleanupInterval , MessageQueue source ) {
64
74
this .publishHighwaterMark = publishHighwaterMark ;
65
75
this .queue = publishHighwaterMark > 0 ? new LinkedBlockingQueue <>(publishHighwaterMark ) : new LinkedBlockingQueue <>();
66
76
this .discardWhenFull = discardWhenFull ;
67
77
this .running = new AtomicInteger (RUNNING );
68
78
this .sizeInBytes = new AtomicLong (0 );
69
79
this .length = new AtomicLong (0 );
70
- this .offerTimeoutMillis = calculateOfferTimeoutMillis (requestCleanupInterval );
80
+ this .offerLockMillis = requestCleanupInterval .toMillis ();
81
+ this .offerTimeoutMillis = Math .max (1 , requestCleanupInterval .toMillis () * 95 / 100 );
71
82
72
83
// The poisonPill is used to stop poll and accumulate when the queue is stopped
73
84
this .poisonPill = new NatsMessage ("_poison" , null , EMPTY_BODY );
74
85
75
- this . filterLock = new ReentrantLock ();
86
+ editLock = new ReentrantLock ();
76
87
77
88
this .singleReaderMode = singleReaderMode ;
78
89
this .requestCleanupInterval = requestCleanupInterval ;
90
+
91
+ if (source != null ) {
92
+ source .drainTo (this );
93
+ }
79
94
}
80
95
81
- MessageQueue (MessageQueue source ) {
82
- this ( source . singleReaderMode , source . publishHighwaterMark , source . discardWhenFull , source . requestCleanupInterval );
83
- source . queue . drainTo ( queue );
84
- length . set ( queue .size () );
85
- }
86
-
87
- private static long calculateOfferTimeoutMillis ( Duration requestCleanupInterval ) {
88
- return Math . max ( 1 , requestCleanupInterval . toMillis () * 95 / 100 );
96
+ void drainTo (MessageQueue target ) {
97
+ editLock . lock ( );
98
+ try {
99
+ queue .drainTo ( target . queue );
100
+ target . length . set ( queue . size ());
101
+ } finally {
102
+ editLock . unlock ();
103
+ }
89
104
}
90
105
91
106
boolean isSingleReaderMode () {
@@ -124,21 +139,36 @@ boolean push(NatsMessage msg) {
124
139
}
125
140
126
141
boolean push (NatsMessage msg , boolean internal ) {
127
- this .filterLock .lock ();
142
+ long start = System .currentTimeMillis ();
143
+ try {
144
+ // try to get the lock, but don't wait forever
145
+ // assuming that if we are waiting for the lock
146
+ // another push likely has the lock and
147
+ if (!editLock .tryLock (offerLockMillis , TimeUnit .MILLISECONDS )) {
148
+ throw new IllegalStateException (OUTPUT_QUEUE_IS_FULL + queue .size ());
149
+ }
150
+ }
151
+ catch (InterruptedException e ) {
152
+ return false ;
153
+ }
154
+
128
155
try {
129
- // If we aren't running, then we need to obey the filter lock
130
- // to avoid ordering problems
131
156
if (!internal && this .discardWhenFull ) {
132
157
return this .queue .offer (msg );
133
158
}
134
- if (!this .offer (msg )) {
135
- throw new IllegalStateException ("Output queue is full " + queue .size ());
159
+
160
+ long timeoutLeft = Math .max (100 , offerTimeoutMillis - (System .currentTimeMillis () - start ));
161
+
162
+ if (!this .queue .offer (msg , timeoutLeft , TimeUnit .MILLISECONDS )) {
163
+ throw new IllegalStateException (OUTPUT_QUEUE_IS_FULL + queue .size ());
136
164
}
137
165
this .sizeInBytes .getAndAdd (msg .getSizeInBytes ());
138
166
this .length .incrementAndGet ();
139
167
return true ;
168
+ } catch (InterruptedException ie ) {
169
+ return false ;
140
170
} finally {
141
- this . filterLock .unlock ();
171
+ editLock .unlock ();
142
172
}
143
173
}
144
174
@@ -154,14 +184,6 @@ void poisonTheQueue() {
154
184
}
155
185
}
156
186
157
- boolean offer (NatsMessage msg ) {
158
- try {
159
- return this .queue .offer (msg , offerTimeoutMillis , TimeUnit .MILLISECONDS );
160
- } catch (InterruptedException ie ) {
161
- return false ;
162
- }
163
- }
164
-
165
187
NatsMessage poll (Duration timeout ) throws InterruptedException {
166
188
NatsMessage msg = null ;
167
189
@@ -289,7 +311,7 @@ long sizeInBytes() {
289
311
}
290
312
291
313
void filter (Predicate <NatsMessage > p ) {
292
- this . filterLock .lock ();
314
+ editLock .lock ();
293
315
try {
294
316
if (this .isRunning ()) {
295
317
throw new IllegalStateException ("Filter is only supported when the queue is paused" );
@@ -307,7 +329,7 @@ void filter(Predicate<NatsMessage> p) {
307
329
}
308
330
this .queue .addAll (newQueue );
309
331
} finally {
310
- this . filterLock .unlock ();
332
+ editLock .unlock ();
311
333
}
312
334
}
313
- }
335
+ }
0 commit comments