diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 5a86ab525c1..31456e70775 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -16,7 +16,8 @@ package io.grpc.inprocess; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkNotNull; + import io.grpc.ExperimentalApi; import io.grpc.Internal; import io.grpc.internal.AbstractManagedChannelImplBuilder; @@ -28,6 +29,7 @@ import java.net.SocketAddress; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; /** * Builder for a channel that issues in-process requests. Clients identify the in-process server by @@ -65,10 +67,11 @@ public static InProcessChannelBuilder forAddress(String name, int port) { } private final String name; + private ScheduledExecutorService scheduledExecutorService; private InProcessChannelBuilder(String name) { super(new InProcessSocketAddress(name), "localhost"); - this.name = Preconditions.checkNotNull(name, "name"); + this.name = checkNotNull(name, "name"); // In-process transport should not record its traffic to the stats module. // https://github.com/grpc/grpc-java/issues/2284 setStatsRecordStartedRpcs(false); @@ -115,25 +118,44 @@ public InProcessChannelBuilder keepAliveWithoutCalls(boolean enable) { return this; } + /** + * Provides a custom scheduled executor service. + * + *

It's an optional parameter. If the user has not provided a scheduled executor service when + * the channel is built, the builder will use a static cached thread pool. + * + * @return this + * + * @since 1.11.0 + */ + public InProcessChannelBuilder scheduledExecutorService( + ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = + checkNotNull(scheduledExecutorService, "scheduledExecutorService"); + return this; + } + @Override @Internal protected ClientTransportFactory buildTransportFactory() { - return new InProcessClientTransportFactory(name); + return new InProcessClientTransportFactory(name, scheduledExecutorService); } /** * Creates InProcess transports. Exposed for internal use, as it should be private. */ - @Internal static final class InProcessClientTransportFactory implements ClientTransportFactory { private final String name; - private final ScheduledExecutorService timerService = - SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); - + private final ScheduledExecutorService timerService; + private final boolean useSharedTimer; private boolean closed; - private InProcessClientTransportFactory(String name) { + private InProcessClientTransportFactory( + String name, @Nullable ScheduledExecutorService scheduledExecutorService) { this.name = name; + useSharedTimer = scheduledExecutorService == null; + timerService = useSharedTimer + ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService; } @Override @@ -156,7 +178,9 @@ public void close() { return; } closed = true; - SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timerService); + if (useSharedTimer) { + SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timerService); + } } } } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java index df94843aea3..5859c252862 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java @@ -18,14 +18,11 @@ import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerStreamTracer; import io.grpc.internal.InternalServer; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServerListener; import io.grpc.internal.ServerTransportListener; -import io.grpc.internal.SharedResourceHolder.Resource; -import io.grpc.internal.SharedResourcePool; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -47,7 +44,7 @@ static InProcessServer findServer(String name) { private final List streamTracerFactories; private ServerListener listener; private boolean shutdown; - /** Expected to be a SharedResourcePool except in testing. */ + /** Defaults to be a SharedResourcePool. */ private final ObjectPool schedulerPool; /** * Only used to make sure the scheduler has at least one reference. Since child transports can @@ -55,13 +52,6 @@ static InProcessServer findServer(String name) { */ private ScheduledExecutorService scheduler; - InProcessServer( - String name, Resource schedulerResource, - List streamTracerFactories) { - this(name, SharedResourcePool.forResource(schedulerResource), streamTracerFactories); - } - - @VisibleForTesting InProcessServer( String name, ObjectPool schedulerPool, List streamTracerFactories) { diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index 3aa889bc027..82f5452961e 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -16,14 +16,20 @@ package io.grpc.inprocess; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.base.Preconditions; import io.grpc.ExperimentalApi; import io.grpc.ServerStreamTracer; import io.grpc.internal.AbstractServerImplBuilder; +import io.grpc.internal.FixedObjectPool; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.SharedResourcePool; import java.io.File; import java.util.List; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** @@ -86,6 +92,8 @@ public static String generateName() { } private final String name; + private ObjectPool schedulerPool = + SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE); private InProcessServerBuilder(String name) { this.name = Preconditions.checkNotNull(name, "name"); @@ -98,10 +106,27 @@ private InProcessServerBuilder(String name) { handshakeTimeout(Long.MAX_VALUE, TimeUnit.SECONDS); } + /** + * Provides a custom scheduled executor service. + * + *

It's an optional parameter. If the user has not provided a scheduled executor service when + * the channel is built, the builder will use a static cached thread pool. + * + * @return this + * + * @since 1.11.0 + */ + public InProcessServerBuilder scheduledExecutorService( + ScheduledExecutorService scheduledExecutorService) { + schedulerPool = new FixedObjectPool( + checkNotNull(scheduledExecutorService, "scheduledExecutorService")); + return this; + } + @Override protected InProcessServer buildTransportServer( List streamTracerFactories) { - return new InProcessServer(name, GrpcUtil.TIMER_SERVICE, streamTracerFactories); + return new InProcessServer(name, schedulerPool, streamTracerFactories); } @Override diff --git a/core/src/main/java/io/grpc/internal/FixedObjectPool.java b/core/src/main/java/io/grpc/internal/FixedObjectPool.java index f26673220b9..b5feb5fd00f 100644 --- a/core/src/main/java/io/grpc/internal/FixedObjectPool.java +++ b/core/src/main/java/io/grpc/internal/FixedObjectPool.java @@ -21,7 +21,7 @@ /** * An object pool that always returns the same instance and does nothing when returning the object. */ -final class FixedObjectPool implements ObjectPool { +public final class FixedObjectPool implements ObjectPool { private final T object; public FixedObjectPool(T object) { diff --git a/core/src/test/java/io/grpc/inprocess/InProcessChannelBuilderTest.java b/core/src/test/java/io/grpc/inprocess/InProcessChannelBuilderTest.java new file mode 100644 index 00000000000..1ba5e583a06 --- /dev/null +++ b/core/src/test/java/io/grpc/inprocess/InProcessChannelBuilderTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.inprocess; + +import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; +import static org.junit.Assert.assertSame; + +import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.FakeClock; +import io.grpc.internal.SharedResourceHolder; +import java.util.concurrent.ScheduledExecutorService; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link InProcessChannelBuilder}. + */ +@RunWith(JUnit4.class) +public class InProcessChannelBuilderTest { + @Test + public void scheduledExecutorService_default() { + InProcessChannelBuilder builder = InProcessChannelBuilder.forName("foo"); + ClientTransportFactory clientTransportFactory = builder.buildTransportFactory(); + assertSame( + SharedResourceHolder.get(TIMER_SERVICE), + clientTransportFactory.getScheduledExecutorService()); + + SharedResourceHolder.release( + TIMER_SERVICE, clientTransportFactory.getScheduledExecutorService()); + clientTransportFactory.close(); + } + + @Test + public void scheduledExecutorService_custom() { + InProcessChannelBuilder builder = InProcessChannelBuilder.forName("foo"); + ScheduledExecutorService scheduledExecutorService = + new FakeClock().getScheduledExecutorService(); + + InProcessChannelBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService); + assertSame(builder, builder1); + + ClientTransportFactory clientTransportFactory = builder1.buildTransportFactory(); + + assertSame(scheduledExecutorService, clientTransportFactory.getScheduledExecutorService()); + + clientTransportFactory.close(); + } +} diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java index 714b557a764..df11f6b7bd6 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java @@ -16,10 +16,18 @@ package io.grpc.inprocess; +import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import io.grpc.ServerStreamTracer.Factory; +import io.grpc.internal.FakeClock; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.SharedResourcePool; +import java.util.ArrayList; +import java.util.concurrent.ScheduledExecutorService; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -42,4 +50,40 @@ public void generateName() { assertNotEquals(name1, name2); } + + @Test + public void scheduledExecutorService_default() { + InProcessServerBuilder builder = InProcessServerBuilder.forName("foo"); + InProcessServer server = builder.buildTransportServer(new ArrayList()); + + ObjectPool scheduledExecutorServicePool = + server.getScheduledExecutorServicePool(); + ObjectPool expectedPool = + SharedResourcePool.forResource(TIMER_SERVICE); + + ScheduledExecutorService expected = expectedPool.getObject(); + ScheduledExecutorService actual = scheduledExecutorServicePool.getObject(); + assertSame(expected, actual); + + expectedPool.returnObject(expected); + scheduledExecutorServicePool.returnObject(actual); + } + + @Test + public void scheduledExecutorService_custom() { + InProcessServerBuilder builder = InProcessServerBuilder.forName("foo"); + ScheduledExecutorService scheduledExecutorService = + new FakeClock().getScheduledExecutorService(); + + InProcessServerBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService); + assertSame(builder, builder1); + + InProcessServer server = builder1.buildTransportServer(new ArrayList()); + ObjectPool scheduledExecutorServicePool = + server.getScheduledExecutorServicePool(); + + assertSame(scheduledExecutorService, scheduledExecutorServicePool.getObject()); + + scheduledExecutorServicePool.returnObject(scheduledExecutorService); + } } diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java index 1bf08581485..6d196c80bcc 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java @@ -24,6 +24,7 @@ import io.grpc.internal.ServerListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.SharedResourcePool; import java.util.Collections; import java.util.concurrent.ScheduledExecutorService; import org.junit.Test; @@ -36,7 +37,7 @@ public class InProcessServerTest { @Test public void getPort_notStarted() throws Exception { InProcessServer s = - new InProcessServer("name", GrpcUtil.TIMER_SERVICE, + new InProcessServer("name", SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), Collections.emptyList()); Truth.assertThat(s.getPort()).isEqualTo(-1); diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index 2c1d6715021..d01ce4c2852 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -20,6 +20,7 @@ import io.grpc.internal.GrpcUtil; import io.grpc.internal.InternalServer; import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.SharedResourcePool; import io.grpc.internal.testing.AbstractTransportTest; import java.util.List; import org.junit.runner.RunWith; @@ -34,7 +35,9 @@ public class InProcessTransportTest extends AbstractTransportTest { @Override protected InternalServer newServer(List streamTracerFactories) { - return new InProcessServer(TRANSPORT_NAME, GrpcUtil.TIMER_SERVICE, streamTracerFactories); + return new InProcessServer( + TRANSPORT_NAME, + SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), streamTracerFactories); } @Override