38
38
39
39
import java .util .concurrent .CountDownLatch ;
40
40
import java .util .concurrent .ExecutorService ;
41
+ import java .util .concurrent .Future ;
41
42
42
43
public class MessageConsumerImplTest {
43
44
@@ -213,20 +214,33 @@ public void testMessageConsumerMultipleCallsAck() throws Exception {
213
214
PullResponse response1 = PullResponse .newBuilder ()
214
215
.addReceivedMessages (MESSAGE1_PB )
215
216
.build ();
216
- PullResponse response2 = PullResponse .newBuilder ()
217
+ final PullResponse response2 = PullResponse .newBuilder ()
217
218
.addReceivedMessages (MESSAGE2_PB )
218
219
.build ();
219
220
EasyMock .expect (options .rpc ()).andReturn (pubsubRpc );
220
221
EasyMock .expect (options .service ()).andReturn (pubsub );
221
222
EasyMock .expect (options .projectId ()).andReturn (PROJECT ).anyTimes ();
223
+ final CountDownLatch nextPullLatch = new CountDownLatch (1 );
222
224
final CountDownLatch latch = new CountDownLatch (2 );
223
225
EasyMock .expect (pubsub .options ()).andReturn (options );
224
- EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
226
+ EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID1 )).andAnswer (new IAnswer <Future <Void >>() {
227
+ @ Override
228
+ public Future <Void > answer () throws Throwable {
229
+ nextPullLatch .await ();
230
+ return null ;
231
+ }
232
+ });
225
233
EasyMock .expect (pubsub .options ()).andReturn (options );
226
234
EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID2 )).andReturn (null );
227
235
EasyMock .replay (pubsub );
228
236
EasyMock .expect (pubsubRpc .pull (request1 )).andReturn (new TestPullFuture (response1 ));
229
- EasyMock .expect (pubsubRpc .pull (request2 )).andReturn (new TestPullFuture (response2 ));
237
+ EasyMock .expect (pubsubRpc .pull (request2 )).andAnswer (new IAnswer <PullFuture >() {
238
+ @ Override
239
+ public PullFuture answer () throws Throwable {
240
+ nextPullLatch .countDown ();
241
+ return new TestPullFuture (response2 );
242
+ }
243
+ });
230
244
EasyMock .expect (pubsubRpc .pull (EasyMock .<PullRequest >anyObject ()))
231
245
.andReturn (new TestPullFuture (EMPTY_RESPONSE )).anyTimes ();
232
246
renewer .add (SUBSCRIPTION , ACK_ID1 );
@@ -253,20 +267,33 @@ public void testMessageConsumerMultipleCallsNack() throws Exception {
253
267
PullResponse response1 = PullResponse .newBuilder ()
254
268
.addReceivedMessages (MESSAGE1_PB )
255
269
.build ();
256
- PullResponse response2 = PullResponse .newBuilder ()
270
+ final PullResponse response2 = PullResponse .newBuilder ()
257
271
.addReceivedMessages (MESSAGE2_PB )
258
272
.build ();
259
273
EasyMock .expect (options .rpc ()).andReturn (pubsubRpc );
260
274
EasyMock .expect (options .service ()).andReturn (pubsub );
261
275
EasyMock .expect (options .projectId ()).andReturn (PROJECT ).anyTimes ();
276
+ final CountDownLatch nextPullLatch = new CountDownLatch (1 );
262
277
final CountDownLatch latch = new CountDownLatch (2 );
263
278
EasyMock .expect (pubsub .options ()).andReturn (options );
264
- EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
279
+ EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID1 )).andAnswer (new IAnswer <Future <Void >>() {
280
+ @ Override
281
+ public Future <Void > answer () throws Throwable {
282
+ nextPullLatch .await ();
283
+ return null ;
284
+ }
285
+ });
265
286
EasyMock .expect (pubsub .options ()).andReturn (options );
266
287
EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID2 )).andReturn (null );
267
288
EasyMock .replay (pubsub );
268
289
EasyMock .expect (pubsubRpc .pull (request1 )).andReturn (new TestPullFuture (response1 ));
269
- EasyMock .expect (pubsubRpc .pull (request2 )).andReturn (new TestPullFuture (response2 ));
290
+ EasyMock .expect (pubsubRpc .pull (request2 )).andAnswer (new IAnswer <PullFuture >() {
291
+ @ Override
292
+ public PullFuture answer () throws Throwable {
293
+ nextPullLatch .countDown ();
294
+ return new TestPullFuture (response2 );
295
+ }
296
+ });
270
297
EasyMock .expect (pubsubRpc .pull (EasyMock .<PullRequest >anyObject ()))
271
298
.andReturn (new TestPullFuture (EMPTY_RESPONSE )).anyTimes ();
272
299
renewer .add (SUBSCRIPTION , ACK_ID1 );
@@ -289,22 +316,35 @@ public void testMessageConsumerMultipleCallsNack() throws Exception {
289
316
@ Test
290
317
public void testMessageConsumerMaxCallbacksAck () throws Exception {
291
318
PullRequest request1 = pullRequest (2 );
292
- PullRequest request2 = pullRequest (2 );
293
- PullResponse otherPullResponse = PullResponse .newBuilder ()
319
+ PullRequest request2 = pullRequest (1 );
320
+ final PullResponse otherPullResponse = PullResponse .newBuilder ()
294
321
.addReceivedMessages (MESSAGE1_PB )
295
322
.build ();
296
323
EasyMock .expect (options .rpc ()).andReturn (pubsubRpc );
297
324
EasyMock .expect (options .service ()).andReturn (pubsub );
298
325
EasyMock .expect (options .projectId ()).andReturn (PROJECT ).anyTimes ();
299
326
EasyMock .expect (pubsub .options ()).andReturn (options ).times (2 );
327
+ final CountDownLatch nextPullLatch = new CountDownLatch (1 );
300
328
final CountDownLatch latch = new CountDownLatch (3 );
301
329
EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
302
- EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID2 )).andReturn (null );
330
+ EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID2 )).andAnswer (new IAnswer <Future <Void >>() {
331
+ @ Override
332
+ public Future <Void > answer () throws Throwable {
333
+ nextPullLatch .await ();
334
+ return null ;
335
+ }
336
+ });
303
337
EasyMock .expect (pubsub .options ()).andReturn (options );
304
338
EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
305
339
EasyMock .replay (pubsub );
306
340
EasyMock .expect (pubsubRpc .pull (request1 )).andReturn (new TestPullFuture (PULL_RESPONSE ));
307
- EasyMock .expect (pubsubRpc .pull (request2 )).andReturn (new TestPullFuture (otherPullResponse ));
341
+ EasyMock .expect (pubsubRpc .pull (request2 )).andAnswer (new IAnswer <PullFuture >() {
342
+ @ Override
343
+ public PullFuture answer () throws Throwable {
344
+ nextPullLatch .countDown ();
345
+ return new TestPullFuture (otherPullResponse );
346
+ }
347
+ });
308
348
EasyMock .expect (pubsubRpc .pull (EasyMock .<PullRequest >anyObject ()))
309
349
.andReturn (new TestPullFuture (EMPTY_RESPONSE )).anyTimes ();
310
350
renewer .add (SUBSCRIPTION , ACK_ID1 );
@@ -331,22 +371,35 @@ public void testMessageConsumerMaxCallbacksAck() throws Exception {
331
371
@ Test
332
372
public void testMessageConsumerMaxCallbacksNack () throws Exception {
333
373
PullRequest request1 = pullRequest (2 );
334
- PullRequest request2 = pullRequest (2 );
335
- PullResponse otherPullResponse = PullResponse .newBuilder ()
374
+ PullRequest request2 = pullRequest (1 );
375
+ final PullResponse otherPullResponse = PullResponse .newBuilder ()
336
376
.addReceivedMessages (MESSAGE1_PB )
337
377
.build ();
338
378
EasyMock .expect (options .rpc ()).andReturn (pubsubRpc );
339
379
EasyMock .expect (options .service ()).andReturn (pubsub );
340
380
EasyMock .expect (options .projectId ()).andReturn (PROJECT ).anyTimes ();
341
381
EasyMock .expect (pubsub .options ()).andReturn (options ).times (2 );
382
+ final CountDownLatch nextPullLatch = new CountDownLatch (1 );
342
383
final CountDownLatch latch = new CountDownLatch (3 );
343
384
EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
344
- EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID2 )).andReturn (null );
385
+ EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID2 )).andAnswer (new IAnswer <Future <Void >>() {
386
+ @ Override
387
+ public Future <Void > answer () throws Throwable {
388
+ nextPullLatch .await ();
389
+ return null ;
390
+ }
391
+ });
345
392
EasyMock .expect (pubsub .options ()).andReturn (options );
346
393
EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
347
394
EasyMock .replay (pubsub );
348
395
EasyMock .expect (pubsubRpc .pull (request1 )).andReturn (new TestPullFuture (PULL_RESPONSE ));
349
- EasyMock .expect (pubsubRpc .pull (request2 )).andReturn (new TestPullFuture (otherPullResponse ));
396
+ EasyMock .expect (pubsubRpc .pull (request2 )).andAnswer (new IAnswer <PullFuture >() {
397
+ @ Override
398
+ public PullFuture answer () throws Throwable {
399
+ nextPullLatch .countDown ();
400
+ return new TestPullFuture (otherPullResponse );
401
+ }
402
+ });
350
403
EasyMock .expect (pubsubRpc .pull (EasyMock .<PullRequest >anyObject ()))
351
404
.andReturn (new TestPullFuture (EMPTY_RESPONSE )).anyTimes ();
352
405
renewer .add (SUBSCRIPTION , ACK_ID1 );
0 commit comments