|
21 | 21 | import static org.junit.Assert.assertNotNull;
|
22 | 22 | import static org.junit.Assert.assertNull;
|
23 | 23 | import static org.junit.Assert.assertTrue;
|
| 24 | +import static org.junit.Assert.fail; |
24 | 25 |
|
25 | 26 | import com.google.cloud.AsyncPage;
|
26 | 27 | import com.google.cloud.Page;
|
27 | 28 | import com.google.cloud.pubsub.PubSub.MessageConsumer;
|
28 | 29 | import com.google.cloud.pubsub.PubSub.MessageProcessor;
|
| 30 | +import com.google.cloud.pubsub.PubSub.PullOption; |
29 | 31 | import com.google.common.collect.ImmutableList;
|
30 | 32 | import com.google.common.collect.Iterators;
|
31 | 33 | import com.google.common.collect.Lists;
|
32 | 34 | import com.google.common.collect.Sets;
|
33 | 35 |
|
| 36 | +import io.grpc.Status; |
| 37 | + |
34 | 38 | import org.junit.Ignore;
|
35 | 39 | import org.junit.Rule;
|
36 | 40 | import org.junit.Test;
|
@@ -545,6 +549,42 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
|
545 | 549 | assertTrue(pubsub().deleteTopic(topic));
|
546 | 550 | }
|
547 | 551 |
|
| 552 | +@Test |
| 553 | +public void testPullMessagesAsyncNotImmediately() |
| 554 | + throws ExecutionException, InterruptedException { |
| 555 | + String topic = formatForTest("test-pull-messages-not-immediately-topic"); |
| 556 | + pubsub().create(TopicInfo.of(topic)); |
| 557 | + String subscription = formatForTest("test-pull-messages-not-immediately-subscription"); |
| 558 | + pubsub().create(SubscriptionInfo.of(topic, subscription)); |
| 559 | + Future<Iterator<ReceivedMessage>> future = |
| 560 | + pubsub().pullAsync(subscription, 2, PullOption.returnImmediately(false)); |
| 561 | + Message message1 = Message.of("payload1"); |
| 562 | + Message message2 = Message.of("payload2"); |
| 563 | + List<String> messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); |
| 564 | + assertEquals(2, messageIds.size()); |
| 565 | + Iterator<ReceivedMessage> iterator = future.get(); |
| 566 | + assertEquals(message1.payloadAsString(), iterator.next().payloadAsString()); |
| 567 | + assertEquals(message2.payloadAsString(), iterator.next().payloadAsString()); |
| 568 | + assertTrue(pubsub().deleteSubscription(subscription)); |
| 569 | + assertTrue(pubsub().deleteTopic(topic)); |
| 570 | +} |
| 571 | + |
| 572 | + @Test |
| 573 | + public void testPullMessagesAsyncNotImmediately_NoMessages() |
| 574 | + throws ExecutionException, InterruptedException { |
| 575 | + String topic = formatForTest("test-pull-messages-not-immediately-topic"); |
| 576 | + pubsub().create(TopicInfo.of(topic)); |
| 577 | + String subscription = formatForTest("test-pull-messages-not-immediately-subscription"); |
| 578 | + pubsub().create(SubscriptionInfo.of(topic, subscription)); |
| 579 | + try { |
| 580 | + pubsub().pullAsync(subscription, 2, PullOption.returnImmediately(false)).get(); |
| 581 | + fail("Expected timeout exception"); |
| 582 | + } catch (ExecutionException ex) { |
| 583 | + PubSubException cause = (PubSubException) ex.getCause(); |
| 584 | + assertEquals(Status.Code.DEADLINE_EXCEEDED.value(), cause.code()); |
| 585 | + } |
| 586 | + } |
| 587 | + |
548 | 588 | @Test
|
549 | 589 | public void testPullAsyncNonExistingSubscription()
|
550 | 590 | throws ExecutionException, InterruptedException {
|
|
0 commit comments