Skip to content

use ChannelProvider instead of ChannelBuilder #1563

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
import com.google.api.gax.bundling.FlowController;
import com.google.api.gax.core.RetrySettings;
import com.google.api.gax.grpc.BundlingSettings;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
Expand All @@ -36,14 +35,8 @@
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -139,9 +132,8 @@ public static long getApiMaxRequestBytes() {
private final AtomicBoolean activeAlarm;

private final FlowController flowController;
private final Channel[] channels;
private final ManagedChannel[] channels;
private final AtomicRoundRobin channelIndex;
private final CallCredentials credentials;

private final ScheduledExecutorService executor;
private final AtomicBoolean shutdown;
Expand All @@ -164,37 +156,35 @@ private Publisher(Builder builder) throws IOException {
messagesBundle = new LinkedList<>();
messagesBundleLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
int numCores = Math.max(1, Runtime.getRuntime().availableProcessors());
executor = builder.executorProvider.getExecutor();
if (builder.executorProvider.shouldAutoClose()) {
closeables.add(
new AutoCloseable() {
@Override
public void close() throws IOException {
public void close() {
executor.shutdown();
}
});
}
channels = new Channel[numCores];
channelIndex = new AtomicRoundRobin(channels.length);
for (int i = 0; i < numCores; i++) {
channels = new ManagedChannel[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < channels.length; i++) {
channels[i] =
builder.channelBuilder.isPresent()
? builder.channelBuilder.get().build()
: NettyChannelBuilder.forAddress(
PublisherSettings.getDefaultServiceAddress(),
PublisherSettings.getDefaultServicePort())
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.executor(executor)
.build();
builder.channelProvider.needsExecutor()
? builder.channelProvider.getChannel(executor)
: builder.channelProvider.getChannel();

This comment was marked as spam.

This comment was marked as spam.

}
if (builder.channelProvider.shouldAutoClose()) {
closeables.add(
new AutoCloseable() {
@Override
public void close() {
for (int i = 0; i < channels.length; i++) {
channels[i].shutdown();
}
}
});
}
credentials =
MoreCallCredentials.from(
builder.userCredentials.isPresent()
? builder.userCredentials.get()
: GoogleCredentials.getApplicationDefault()
.createScoped(PublisherSettings.getDefaultServiceScopes()));
channelIndex = new AtomicRoundRobin(channels.length);
shutdown = new AtomicBoolean(false);
messagesWaiter = new MessagesWaiter();
}
Expand Down Expand Up @@ -350,7 +340,6 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle)

Futures.addCallback(
PublisherGrpc.newFutureStub(channels[currentChannel])
.withCallCredentials(credentials)
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS)
.publish(publishRequest.build()),
new FutureCallback<PublishResponse>() {
Expand Down Expand Up @@ -588,37 +577,23 @@ public long nextLong(long least, long bound) {
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
LongRandom longRandom = DEFAULT_LONG_RANDOM;

// Channels and credentials
Optional<Credentials> userCredentials = Optional.absent();
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
Optional.absent();

ChannelProvider channelProvider = PublisherSettings.defaultChannelProviderBuilder().build();
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;

private Builder(TopicName topic) {
this.topicName = Preconditions.checkNotNull(topic);
}

/**
* Credentials to authenticate with.
*
* <p>Must be properly scoped for accessing Cloud Pub/Sub APIs.
*/
public Builder setCredentials(Credentials userCredentials) {
this.userCredentials = Optional.of(Preconditions.checkNotNull(userCredentials));
return this;
}

/**
* ManagedChannelBuilder to use to create Channels.
* {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub
* endpoint.
*
* <p>Must point at Cloud Pub/Sub endpoint.
* <p>For performance, this client benefits from having multiple channels open at once. Users
* are encouraged to provide instances of {@code ChannelProvider} that creates new channels
* instead of returning pre-initialized ones.
*/
public Builder setChannelBuilder(
ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> channelBuilder) {
this.channelBuilder =
Optional.<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>of(
Preconditions.checkNotNull(channelBuilder));
public Builder setChannelProvider(ChannelProvider channelProvider) {
this.channelProvider = Preconditions.checkNotNull(channelProvider);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.api.gax.bundling.FlowController;
import com.google.api.gax.grpc.BundlingSettings;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
Expand All @@ -36,13 +37,15 @@
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ServerImpl;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Before;
Expand All @@ -63,7 +66,28 @@ public class PublisherImplTest {
private static final ExecutorProvider SINGLE_THREAD_EXECUTOR =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

private InProcessChannelBuilder testChannelBuilder;
private static final ChannelProvider TEST_CHANNEL_PROVIDER =
new ChannelProvider() {
@Override
public boolean shouldAutoClose() {
return true;
}

@Override
public boolean needsExecutor() {
return false;
}

@Override
public ManagedChannel getChannel() {
return InProcessChannelBuilder.forName("test-server").build();
}

@Override
public ManagedChannel getChannel(Executor executor) {
throw new IllegalArgumentException("testChannelprovider doesn't need an executor");
}
};

@Captor private ArgumentCaptor<PublishRequest> requestCaptor;

Expand All @@ -82,8 +106,6 @@ public void setUp() throws Exception {
testPublisherServiceImpl = Mockito.spy(new FakePublisherServiceImpl());

InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("test-server");
testChannelBuilder = InProcessChannelBuilder.forName("test-server");
InProcessChannelBuilder.forName("publisher_test");
serverBuilder.addService(testPublisherServiceImpl);
testServer = serverBuilder.build();
testServer.start();
Expand All @@ -92,7 +114,6 @@ public void setUp() throws Exception {
testPublisherServiceImpl.reset();
Mockito.reset(testPublisherServiceImpl);
fakeExecutor = new FakeScheduledExecutorService();
testCredentials = new FakeCredentials();
}

@After
Expand Down Expand Up @@ -350,11 +371,8 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce

@Test
public void testPublisherGetters() throws Exception {
FakeCredentials credentials = new FakeCredentials();

Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
builder.setChannelBuilder(testChannelBuilder);
builder.setCredentials(credentials);
builder.setChannelProvider(TEST_CHANNEL_PROVIDER);
builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR);
builder.setFailOnFlowControlLimits(true);
builder.setBundlingSettings(
Expand Down Expand Up @@ -386,7 +404,6 @@ public void testPublisherGetters() throws Exception {
public void testBuilderParametersAndDefaults() {
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
assertEquals(TEST_TOPIC, builder.topicName);
assertEquals(Optional.absent(), builder.channelBuilder);
assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
assertFalse(builder.failOnFlowControlLimits);
assertEquals(
Expand All @@ -399,22 +416,14 @@ public void testBuilderParametersAndDefaults() {
builder.bundlingSettings.getElementCountThreshold().longValue());
assertEquals(FlowController.Settings.DEFAULT, builder.flowControlSettings);
assertEquals(Publisher.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings);
assertEquals(Optional.absent(), builder.userCredentials);
}

@Test
public void testBuilderInvalidArguments() {
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);

try {
builder.setChannelBuilder(null);
fail("Should have thrown an IllegalArgumentException");
} catch (NullPointerException expected) {
// Expected
}

try {
builder.setCredentials(null);
builder.setChannelProvider(null);
fail("Should have thrown an IllegalArgumentException");
} catch (NullPointerException expected) {
// Expected
Expand Down Expand Up @@ -602,9 +611,8 @@ public void testBuilderInvalidArguments() {

private Builder getTestPublisherBuilder() {
return Publisher.newBuilder(TEST_TOPIC)
.setCredentials(testCredentials)
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
.setChannelBuilder(testChannelBuilder)
.setChannelProvider(TEST_CHANNEL_PROVIDER)
.setLongRandom(
new Publisher.LongRandom() {
@Override
Expand Down