diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
index 5a86ab525c1..31456e70775 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
@@ -16,7 +16,8 @@
package io.grpc.inprocess;
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import io.grpc.ExperimentalApi;
import io.grpc.Internal;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
@@ -28,6 +29,7 @@
import java.net.SocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
/**
* Builder for a channel that issues in-process requests. Clients identify the in-process server by
@@ -65,10 +67,11 @@ public static InProcessChannelBuilder forAddress(String name, int port) {
}
private final String name;
+ private ScheduledExecutorService scheduledExecutorService;
private InProcessChannelBuilder(String name) {
super(new InProcessSocketAddress(name), "localhost");
- this.name = Preconditions.checkNotNull(name, "name");
+ this.name = checkNotNull(name, "name");
// In-process transport should not record its traffic to the stats module.
// https://github.com/grpc/grpc-java/issues/2284
setStatsRecordStartedRpcs(false);
@@ -115,25 +118,44 @@ public InProcessChannelBuilder keepAliveWithoutCalls(boolean enable) {
return this;
}
+ /**
+ * Provides a custom scheduled executor service.
+ *
+ *
It's an optional parameter. If the user has not provided a scheduled executor service when
+ * the channel is built, the builder will use a static cached thread pool.
+ *
+ * @return this
+ *
+ * @since 1.11.0
+ */
+ public InProcessChannelBuilder scheduledExecutorService(
+ ScheduledExecutorService scheduledExecutorService) {
+ this.scheduledExecutorService =
+ checkNotNull(scheduledExecutorService, "scheduledExecutorService");
+ return this;
+ }
+
@Override
@Internal
protected ClientTransportFactory buildTransportFactory() {
- return new InProcessClientTransportFactory(name);
+ return new InProcessClientTransportFactory(name, scheduledExecutorService);
}
/**
* Creates InProcess transports. Exposed for internal use, as it should be private.
*/
- @Internal
static final class InProcessClientTransportFactory implements ClientTransportFactory {
private final String name;
- private final ScheduledExecutorService timerService =
- SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
-
+ private final ScheduledExecutorService timerService;
+ private final boolean useSharedTimer;
private boolean closed;
- private InProcessClientTransportFactory(String name) {
+ private InProcessClientTransportFactory(
+ String name, @Nullable ScheduledExecutorService scheduledExecutorService) {
this.name = name;
+ useSharedTimer = scheduledExecutorService == null;
+ timerService = useSharedTimer
+ ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
}
@Override
@@ -156,7 +178,9 @@ public void close() {
return;
}
closed = true;
- SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timerService);
+ if (useSharedTimer) {
+ SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timerService);
+ }
}
}
}
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java
index df94843aea3..5859c252862 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java
@@ -18,14 +18,11 @@
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransportListener;
-import io.grpc.internal.SharedResourceHolder.Resource;
-import io.grpc.internal.SharedResourcePool;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -47,7 +44,7 @@ static InProcessServer findServer(String name) {
private final List streamTracerFactories;
private ServerListener listener;
private boolean shutdown;
- /** Expected to be a SharedResourcePool except in testing. */
+ /** Defaults to be a SharedResourcePool. */
private final ObjectPool schedulerPool;
/**
* Only used to make sure the scheduler has at least one reference. Since child transports can
@@ -55,13 +52,6 @@ static InProcessServer findServer(String name) {
*/
private ScheduledExecutorService scheduler;
- InProcessServer(
- String name, Resource schedulerResource,
- List streamTracerFactories) {
- this(name, SharedResourcePool.forResource(schedulerResource), streamTracerFactories);
- }
-
- @VisibleForTesting
InProcessServer(
String name, ObjectPool schedulerPool,
List streamTracerFactories) {
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
index 3aa889bc027..82f5452961e 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
@@ -16,14 +16,20 @@
package io.grpc.inprocess;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.google.common.base.Preconditions;
import io.grpc.ExperimentalApi;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractServerImplBuilder;
+import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.ObjectPool;
+import io.grpc.internal.SharedResourcePool;
import java.io.File;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -86,6 +92,8 @@ public static String generateName() {
}
private final String name;
+ private ObjectPool schedulerPool =
+ SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
private InProcessServerBuilder(String name) {
this.name = Preconditions.checkNotNull(name, "name");
@@ -98,10 +106,27 @@ private InProcessServerBuilder(String name) {
handshakeTimeout(Long.MAX_VALUE, TimeUnit.SECONDS);
}
+ /**
+ * Provides a custom scheduled executor service.
+ *
+ * It's an optional parameter. If the user has not provided a scheduled executor service when
+ * the channel is built, the builder will use a static cached thread pool.
+ *
+ * @return this
+ *
+ * @since 1.11.0
+ */
+ public InProcessServerBuilder scheduledExecutorService(
+ ScheduledExecutorService scheduledExecutorService) {
+ schedulerPool = new FixedObjectPool(
+ checkNotNull(scheduledExecutorService, "scheduledExecutorService"));
+ return this;
+ }
+
@Override
protected InProcessServer buildTransportServer(
List streamTracerFactories) {
- return new InProcessServer(name, GrpcUtil.TIMER_SERVICE, streamTracerFactories);
+ return new InProcessServer(name, schedulerPool, streamTracerFactories);
}
@Override
diff --git a/core/src/main/java/io/grpc/internal/FixedObjectPool.java b/core/src/main/java/io/grpc/internal/FixedObjectPool.java
index f26673220b9..b5feb5fd00f 100644
--- a/core/src/main/java/io/grpc/internal/FixedObjectPool.java
+++ b/core/src/main/java/io/grpc/internal/FixedObjectPool.java
@@ -21,7 +21,7 @@
/**
* An object pool that always returns the same instance and does nothing when returning the object.
*/
-final class FixedObjectPool implements ObjectPool {
+public final class FixedObjectPool implements ObjectPool {
private final T object;
public FixedObjectPool(T object) {
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessChannelBuilderTest.java b/core/src/test/java/io/grpc/inprocess/InProcessChannelBuilderTest.java
new file mode 100644
index 00000000000..1ba5e583a06
--- /dev/null
+++ b/core/src/test/java/io/grpc/inprocess/InProcessChannelBuilderTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.inprocess;
+
+import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
+import static org.junit.Assert.assertSame;
+
+import io.grpc.internal.ClientTransportFactory;
+import io.grpc.internal.FakeClock;
+import io.grpc.internal.SharedResourceHolder;
+import java.util.concurrent.ScheduledExecutorService;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link InProcessChannelBuilder}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessChannelBuilderTest {
+ @Test
+ public void scheduledExecutorService_default() {
+ InProcessChannelBuilder builder = InProcessChannelBuilder.forName("foo");
+ ClientTransportFactory clientTransportFactory = builder.buildTransportFactory();
+ assertSame(
+ SharedResourceHolder.get(TIMER_SERVICE),
+ clientTransportFactory.getScheduledExecutorService());
+
+ SharedResourceHolder.release(
+ TIMER_SERVICE, clientTransportFactory.getScheduledExecutorService());
+ clientTransportFactory.close();
+ }
+
+ @Test
+ public void scheduledExecutorService_custom() {
+ InProcessChannelBuilder builder = InProcessChannelBuilder.forName("foo");
+ ScheduledExecutorService scheduledExecutorService =
+ new FakeClock().getScheduledExecutorService();
+
+ InProcessChannelBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
+ assertSame(builder, builder1);
+
+ ClientTransportFactory clientTransportFactory = builder1.buildTransportFactory();
+
+ assertSame(scheduledExecutorService, clientTransportFactory.getScheduledExecutorService());
+
+ clientTransportFactory.close();
+ }
+}
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java
index 714b557a764..df11f6b7bd6 100644
--- a/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java
+++ b/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java
@@ -16,10 +16,18 @@
package io.grpc.inprocess;
+import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import io.grpc.ServerStreamTracer.Factory;
+import io.grpc.internal.FakeClock;
+import io.grpc.internal.ObjectPool;
+import io.grpc.internal.SharedResourcePool;
+import java.util.ArrayList;
+import java.util.concurrent.ScheduledExecutorService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -42,4 +50,40 @@ public void generateName() {
assertNotEquals(name1, name2);
}
+
+ @Test
+ public void scheduledExecutorService_default() {
+ InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
+ InProcessServer server = builder.buildTransportServer(new ArrayList());
+
+ ObjectPool scheduledExecutorServicePool =
+ server.getScheduledExecutorServicePool();
+ ObjectPool expectedPool =
+ SharedResourcePool.forResource(TIMER_SERVICE);
+
+ ScheduledExecutorService expected = expectedPool.getObject();
+ ScheduledExecutorService actual = scheduledExecutorServicePool.getObject();
+ assertSame(expected, actual);
+
+ expectedPool.returnObject(expected);
+ scheduledExecutorServicePool.returnObject(actual);
+ }
+
+ @Test
+ public void scheduledExecutorService_custom() {
+ InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
+ ScheduledExecutorService scheduledExecutorService =
+ new FakeClock().getScheduledExecutorService();
+
+ InProcessServerBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
+ assertSame(builder, builder1);
+
+ InProcessServer server = builder1.buildTransportServer(new ArrayList());
+ ObjectPool scheduledExecutorServicePool =
+ server.getScheduledExecutorServicePool();
+
+ assertSame(scheduledExecutorService, scheduledExecutorServicePool.getObject());
+
+ scheduledExecutorServicePool.returnObject(scheduledExecutorService);
+ }
}
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java
index 1bf08581485..6d196c80bcc 100644
--- a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java
+++ b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java
@@ -24,6 +24,7 @@
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
+import io.grpc.internal.SharedResourcePool;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Test;
@@ -36,7 +37,7 @@ public class InProcessServerTest {
@Test
public void getPort_notStarted() throws Exception {
InProcessServer s =
- new InProcessServer("name", GrpcUtil.TIMER_SERVICE,
+ new InProcessServer("name", SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
Collections.emptyList());
Truth.assertThat(s.getPort()).isEqualTo(-1);
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
index 2c1d6715021..d01ce4c2852 100644
--- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
+++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
@@ -20,6 +20,7 @@
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
+import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.testing.AbstractTransportTest;
import java.util.List;
import org.junit.runner.RunWith;
@@ -34,7 +35,9 @@ public class InProcessTransportTest extends AbstractTransportTest {
@Override
protected InternalServer newServer(List streamTracerFactories) {
- return new InProcessServer(TRANSPORT_NAME, GrpcUtil.TIMER_SERVICE, streamTracerFactories);
+ return new InProcessServer(
+ TRANSPORT_NAME,
+ SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), streamTracerFactories);
}
@Override