Skip to content

pubsub: add Publisher.awaitTermination #3688

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google-cloud-clients/google-cloud-pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ try {
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.NoHeaderProvider;
import com.google.api.gax.rpc.StatusCode;
Expand All @@ -46,7 +43,6 @@
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.TopicNames;
import io.grpc.Status;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -424,6 +420,16 @@ public void shutdown() throws Exception {
publisherStub.shutdown();
}

/**
* Wait for all work has completed execution after a {@link #shutdown()} request, or the timeout
* occurs, or the current thread is interrupted.
*
* <p>Call this method to make sure all resources are freed properly.
*/
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return publisherStub.awaitTermination(duration, unit);
}

private boolean hasBatchingBytes() {
return getMaxBatchBytes() > 0;
}
Expand All @@ -443,6 +449,7 @@ private boolean hasBatchingBytes() {
* } finally {
* // When finished with the publisher, make sure to shutdown to free up resources.
* publisher.shutdown();
* publisher.awaitTermination(1, TimeUnit.MINUTES);
* }
* }</pre>
*/
Expand All @@ -463,6 +470,7 @@ public static Builder newBuilder(TopicName topicName) {
* } finally {
* // When finished with the publisher, make sure to shutdown to free up resources.
* publisher.shutdown();
* publisher.awaitTermination(1, TimeUnit.MINUTES);
* }
* }</pre>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsub.it;

import static com.google.common.truth.Truth.assertThat;

import com.google.auto.value.AutoValue;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
Expand All @@ -32,20 +34,17 @@
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static com.google.common.truth.Truth.assertThat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class ITPubSubTest {

Expand Down Expand Up @@ -147,6 +146,7 @@ public void failed(Subscriber.State from, Throwable failure) {
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg2")).build())
.get();
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);

// Ack the first message.
MessageAndConsumer toAck = pollQueue(receiveQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessServerBuilder;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -114,6 +115,7 @@ public void testPublishByDuration() throws Exception {

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand Down Expand Up @@ -152,6 +154,7 @@ public void testPublishByNumBatchedMessages() throws Exception {
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand Down Expand Up @@ -186,6 +189,7 @@ public void testSinglePublishByNumBytes() throws Exception {

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand Down Expand Up @@ -228,6 +232,7 @@ public void testPublishMixedSizeAndDuration() throws Exception {
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
assertEquals(1, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

private ApiFuture<String> sendTestMessage(Publisher publisher, String data) {
Expand Down Expand Up @@ -278,6 +283,7 @@ public void testPublishFailureRetries() throws Exception {

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test(expected = ExecutionException.class)
Expand All @@ -302,6 +308,7 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception {
} finally {
assertSame(testPublisherServiceImpl.getCapturedRequests().size(), 1);
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

Expand All @@ -328,6 +335,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception {

assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand All @@ -353,6 +361,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception

assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test(expected = ExecutionException.class)
Expand Down Expand Up @@ -381,6 +390,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
} finally {
assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1);
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

Expand All @@ -403,6 +413,7 @@ public void testPublisherGetters() throws Exception {
assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold());
assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub topic and asynchronously
Expand Down Expand Up @@ -75,6 +75,7 @@ public static void publishMessages() throws Exception {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
// [END pubsub_publish]
Expand Down Expand Up @@ -123,11 +124,12 @@ public void onSuccess(String messageId) {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
// [END pubsub_publish_error_handler]
}

public static void main(String... args) throws Exception {
createTopic();
publishMessages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import org.threeten.bp.Duration;

import java.io.FileInputStream;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;

/** This class contains snippets for the {@link Publisher} interface. */
public class PublisherSnippets {
Expand Down Expand Up @@ -78,6 +78,7 @@ public static void newBuilder(String projectId, String topicId) throws Exception
} finally {
// When finished with the publisher, make sure to shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

Expand Down Expand Up @@ -108,8 +109,8 @@ public Publisher getPublisherWithCustomRetrySettings(ProjectTopicName topicName)
Duration retryDelay = Duration.ofMillis(100); // default : 1 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures
Duration maxRetryDelay = Duration.ofSeconds(5); // default : 10 seconds
Duration totalTimeout = Duration.ofSeconds(1); // default: 0
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 0
Duration totalTimeout = Duration.ofSeconds(1); // default: 0
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 0
Duration maxRpcTimeout = Duration.ofSeconds(10); // default: 0

RetrySettings retrySettings = RetrySettings.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.examples.pubsub.snippets;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand All @@ -24,27 +27,22 @@
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.ReceivedMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class ITPubSubSnippets {

Expand Down Expand Up @@ -98,6 +96,7 @@ public void onFailure(Throwable t) {
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

Expand Down Expand Up @@ -144,6 +143,7 @@ public void testPublisherSyncSubscriber() throws Exception {
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

Expand Down