Skip to content

cronet: allow application to provide all threads #4249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.cronet;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -73,6 +74,9 @@ public static CronetChannelBuilder forAddress(String name, int port) {
throw new UnsupportedOperationException("call forAddress(String, int, CronetEngine) instead");
}

@Nullable
private ScheduledExecutorService scheduledExecutorService;

private final CronetEngine cronetEngine;

private boolean alwaysUsePut = false;
Expand Down Expand Up @@ -161,12 +165,30 @@ public final CronetChannelBuilder setTrafficStatsUid(int uid) {
return this;
}

/**
* Provides a custom scheduled executor service.
*
* <p>It's an optional parameter. If the user has not provided a scheduled executor service when
* the channel is built, the builder will use a static cached thread pool.
*
* @return this
*
* @since 1.12.0
*/
public final CronetChannelBuilder scheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService =
checkNotNull(scheduledExecutorService, "scheduledExecutorService");
return this;
}

@Override
protected final ClientTransportFactory buildTransportFactory() {
return new CronetTransportFactory(
new TaggingStreamFactory(
cronetEngine, trafficStatsTagSet, trafficStatsTag, trafficStatsUidSet, trafficStatsUid),
MoreExecutors.directExecutor(),
scheduledExecutorService,
maxMessageSize,
alwaysUsePut,
transportTracerFactory.create());
Expand All @@ -180,20 +202,24 @@ protected Attributes getNameResolverParams() {

@VisibleForTesting
static class CronetTransportFactory implements ClientTransportFactory {
private final ScheduledExecutorService timeoutService =
SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
private final ScheduledExecutorService timeoutService;
private final Executor executor;
private final int maxMessageSize;
private final boolean alwaysUsePut;
private final StreamBuilderFactory streamFactory;
private final TransportTracer transportTracer;
private final boolean usingSharedScheduler;

private CronetTransportFactory(
StreamBuilderFactory streamFactory,
Executor executor,
@Nullable ScheduledExecutorService timeoutService,
int maxMessageSize,
boolean alwaysUsePut,
TransportTracer transportTracer) {
usingSharedScheduler = timeoutService == null;
this.timeoutService = usingSharedScheduler
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService;
this.maxMessageSize = maxMessageSize;
this.alwaysUsePut = alwaysUsePut;
this.streamFactory = streamFactory;
Expand All @@ -216,7 +242,9 @@ public ScheduledExecutorService getScheduledExecutorService() {

@Override
public void close() {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
if (usingSharedScheduler) {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
}
}
}

Expand Down
33 changes: 33 additions & 0 deletions cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@

package io.grpc.cronet;

import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.cronet.CronetChannelBuilder.CronetTransportFactory;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.testing.TestMethodDescriptors;
import java.net.InetSocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import org.chromium.net.ExperimentalCronetEngine;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -73,4 +79,31 @@ public void alwaysUsePut_defaultsToFalse() throws Exception {

assertFalse(stream.idempotent);
}

@Test
public void scheduledExecutorService_default() {
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
ClientTransportFactory clientTransportFactory = builder.buildTransportFactory();
assertSame(
SharedResourceHolder.get(TIMER_SERVICE),
clientTransportFactory.getScheduledExecutorService());

SharedResourceHolder.release(
TIMER_SERVICE, clientTransportFactory.getScheduledExecutorService());
clientTransportFactory.close();
}

@Test
public void scheduledExecutorService_custom() {
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);

CronetChannelBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
assertSame(builder, builder1);

ClientTransportFactory clientTransportFactory = builder1.buildTransportFactory();
assertSame(scheduledExecutorService, clientTransportFactory.getScheduledExecutorService());

clientTransportFactory.close();
}
}