From 1309ac32d6254b8e58a12bbe67eac77b68bb090f Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 25 Jan 2017 17:45:26 +1100 Subject: [PATCH 1/2] use ChannelProvider instead of ChannelBuilder --- .../google/cloud/pubsub/spi/v1/Publisher.java | 78 ++++++------------- .../pubsub/spi/v1/PublisherImplTest.java | 50 ++++++------ 2 files changed, 53 insertions(+), 75 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index 39941ae878fa..a11271906516 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -19,11 +19,10 @@ import com.google.api.gax.bundling.FlowController; import com.google.api.gax.core.RetrySettings; import com.google.api.gax.grpc.BundlingSettings; +import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; -import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FutureCallback; @@ -35,14 +34,8 @@ import com.google.pubsub.v1.PublisherGrpc; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; -import io.grpc.CallCredentials; -import io.grpc.Channel; -import io.grpc.ManagedChannelBuilder; +import io.grpc.ManagedChannel; import io.grpc.Status; -import io.grpc.auth.MoreCallCredentials; -import io.grpc.netty.GrpcSslContexts; -import io.grpc.netty.NegotiationType; -import io.grpc.netty.NettyChannelBuilder; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -136,9 +129,8 @@ public static long getApiMaxRequestBytes() { private final AtomicBoolean activeAlarm; private final FlowController flowController; - private final Channel[] channels; + private final ManagedChannel[] channels; private final AtomicRoundRobin channelIndex; - private final CallCredentials credentials; private final ScheduledExecutorService executor; private final AtomicBoolean shutdown; @@ -159,37 +151,35 @@ private Publisher(Builder builder) throws IOException { messagesBundle = new LinkedList<>(); messagesBundleLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); - int numCores = Math.max(1, Runtime.getRuntime().availableProcessors()); executor = builder.executorProvider.getExecutor(); if (builder.executorProvider.shouldAutoClose()) { closeables.add( new AutoCloseable() { @Override - public void close() throws IOException { + public void close() { executor.shutdown(); } }); } - channels = new Channel[numCores]; - channelIndex = new AtomicRoundRobin(channels.length); - for (int i = 0; i < numCores; i++) { + channels = new ManagedChannel[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < channels.length; i++) { channels[i] = - builder.channelBuilder.isPresent() - ? builder.channelBuilder.get().build() - : NettyChannelBuilder.forAddress( - PublisherSettings.getDefaultServiceAddress(), - PublisherSettings.getDefaultServicePort()) - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) - .executor(executor) - .build(); + builder.channelProvider.needsExecutor() + ? builder.channelProvider.getChannel(executor) + : builder.channelProvider.getChannel(); + } + if (builder.channelProvider.shouldAutoClose()) { + closeables.add( + new AutoCloseable() { + @Override + public void close() { + for (int i = 0; i < channels.length; i++) { + channels[i].shutdown(); + } + } + }); } - credentials = - MoreCallCredentials.from( - builder.userCredentials.isPresent() - ? builder.userCredentials.get() - : GoogleCredentials.getApplicationDefault() - .createScoped(PublisherSettings.getDefaultServiceScopes())); + channelIndex = new AtomicRoundRobin(channels.length); shutdown = new AtomicBoolean(false); messagesWaiter = new MessagesWaiter(); } @@ -345,7 +335,6 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) Futures.addCallback( PublisherGrpc.newFutureStub(channels[currentChannel]) - .withCallCredentials(credentials) .withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS) .publish(publishRequest.build()), new FutureCallback() { @@ -565,11 +554,7 @@ public static final class Builder { RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; - // Channels and credentials - Optional userCredentials = Optional.absent(); - Optional>> channelBuilder = - Optional.absent(); - + ChannelProvider channelProvider = PublisherSettings.defaultChannelProviderBuilder().build(); ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; /** Constructs a new {@link Builder} using the given topic. */ @@ -582,25 +567,12 @@ public static Builder newBuilder(TopicName topic) { } /** - * Credentials to authenticate with. - * - *

Must be properly scoped for accessing Cloud Pub/Sub APIs. - */ - public Builder setCredentials(Credentials userCredentials) { - this.userCredentials = Optional.of(Preconditions.checkNotNull(userCredentials)); - return this; - } - - /** - * ManagedChannelBuilder to use to create Channels. + * ChannelProvider to use to create Channels. * *

Must point at Cloud Pub/Sub endpoint. */ - public Builder setChannelBuilder( - ManagedChannelBuilder> channelBuilder) { - this.channelBuilder = - Optional.>>of( - Preconditions.checkNotNull(channelBuilder)); + public Builder setChannelProvider(ChannelProvider channelProvider) { + this.channelProvider = Preconditions.checkNotNull(channelProvider); return this; } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java index e831763702af..b9e420121285 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java @@ -25,6 +25,7 @@ import com.google.api.gax.bundling.FlowController; import com.google.api.gax.grpc.BundlingSettings; +import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.FixedExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; @@ -36,6 +37,7 @@ import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; +import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; @@ -43,6 +45,7 @@ import io.grpc.internal.ServerImpl; import io.grpc.stub.StreamObserver; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import org.joda.time.Duration; import org.junit.AfterClass; import org.junit.Before; @@ -64,14 +67,33 @@ public class PublisherImplTest { private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); - private static InProcessChannelBuilder testChannelBuilder; + private static final ChannelProvider TEST_CHANNEL_PROVIDER = + new ChannelProvider() { + @Override + public boolean shouldAutoClose() { + return true; + } + + @Override + public boolean needsExecutor() { + return false; + } + + @Override + public ManagedChannel getChannel() { + return InProcessChannelBuilder.forName("test-server").build(); + } + + @Override + public ManagedChannel getChannel(Executor executor) { + throw new IllegalArgumentException("testChannelprovider doesn't need an executor"); + } + }; @Captor private ArgumentCaptor requestCaptor; private FakeScheduledExecutorService fakeExecutor; - private FakeCredentials testCredentials; - private static FakePublisherServiceImpl testPublisherServiceImpl; private static ServerImpl testServer; @@ -81,8 +103,6 @@ public static void setUpClass() throws Exception { testPublisherServiceImpl = Mockito.spy(new FakePublisherServiceImpl()); InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("test-server"); - testChannelBuilder = InProcessChannelBuilder.forName("test-server"); - InProcessChannelBuilder.forName("publisher_test"); serverBuilder.addService(testPublisherServiceImpl); testServer = serverBuilder.build(); testServer.start(); @@ -94,7 +114,6 @@ public void setUp() throws Exception { testPublisherServiceImpl.reset(); Mockito.reset(testPublisherServiceImpl); fakeExecutor = new FakeScheduledExecutorService(); - testCredentials = new FakeCredentials(); } @AfterClass @@ -351,11 +370,8 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce @Test public void testPublisherGetters() throws Exception { - FakeCredentials credentials = new FakeCredentials(); - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); - builder.setChannelBuilder(testChannelBuilder); - builder.setCredentials(credentials); + builder.setChannelProvider(TEST_CHANNEL_PROVIDER); builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR); builder.setFailOnFlowControlLimits(true); builder.setBundlingSettings( @@ -387,7 +403,6 @@ public void testPublisherGetters() throws Exception { public void testBuilderParametersAndDefaults() { Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); assertEquals(TEST_TOPIC.toString(), builder.topic); - assertEquals(Optional.absent(), builder.channelBuilder); assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); assertFalse(builder.failOnFlowControlLimits); assertEquals( @@ -400,7 +415,6 @@ public void testBuilderParametersAndDefaults() { builder.bundlingSettings.getElementCountThreshold().longValue()); assertEquals(FlowController.Settings.DEFAULT, builder.flowControlSettings); assertEquals(Publisher.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings); - assertEquals(Optional.absent(), builder.userCredentials); } @Test @@ -408,14 +422,7 @@ public void testBuilderInvalidArguments() { Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); try { - builder.setChannelBuilder(null); - fail("Should have thrown an IllegalArgumentException"); - } catch (NullPointerException expected) { - // Expected - } - - try { - builder.setCredentials(null); + builder.setChannelProvider(null); fail("Should have thrown an IllegalArgumentException"); } catch (NullPointerException expected) { // Expected @@ -603,8 +610,7 @@ public void testBuilderInvalidArguments() { private Builder getTestPublisherBuilder() { return Publisher.Builder.newBuilder(TEST_TOPIC) - .setCredentials(testCredentials) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) - .setChannelBuilder(testChannelBuilder); + .setChannelProvider(TEST_CHANNEL_PROVIDER); } } From d23d166a0ce94bd3c3adaae71ddb95cb04b8a556 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 26 Jan 2017 11:02:37 +1100 Subject: [PATCH 2/2] pr comment --- .../java/com/google/cloud/pubsub/spi/v1/Publisher.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index 075663af3063..722d26f17051 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -24,7 +24,6 @@ import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FutureCallback; @@ -586,9 +585,12 @@ private Builder(TopicName topic) { } /** - * ChannelProvider to use to create Channels. + * {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub + * endpoint. * - *

Must point at Cloud Pub/Sub endpoint. + *

For performance, this client benefits from having multiple channels open at once. Users + * are encouraged to provide instances of {@code ChannelProvider} that creates new channels + * instead of returning pre-initialized ones. */ public Builder setChannelProvider(ChannelProvider channelProvider) { this.channelProvider = Preconditions.checkNotNull(channelProvider);