Skip to content

Commit 39decad

Browse files
authored
core: allow application to provide all threads - inprocess channel
1 parent 00d1805 commit 39decad

File tree

8 files changed

+174
-24
lines changed

8 files changed

+174
-24
lines changed

core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
package io.grpc.inprocess;
1818

19-
import com.google.common.base.Preconditions;
19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
2021
import io.grpc.ExperimentalApi;
2122
import io.grpc.Internal;
2223
import io.grpc.internal.AbstractManagedChannelImplBuilder;
@@ -28,6 +29,7 @@
2829
import java.net.SocketAddress;
2930
import java.util.concurrent.ScheduledExecutorService;
3031
import java.util.concurrent.TimeUnit;
32+
import javax.annotation.Nullable;
3133

3234
/**
3335
* Builder for a channel that issues in-process requests. Clients identify the in-process server by
@@ -65,10 +67,11 @@ public static InProcessChannelBuilder forAddress(String name, int port) {
6567
}
6668

6769
private final String name;
70+
private ScheduledExecutorService scheduledExecutorService;
6871

6972
private InProcessChannelBuilder(String name) {
7073
super(new InProcessSocketAddress(name), "localhost");
71-
this.name = Preconditions.checkNotNull(name, "name");
74+
this.name = checkNotNull(name, "name");
7275
// In-process transport should not record its traffic to the stats module.
7376
// https://github.com/grpc/grpc-java/issues/2284
7477
setStatsRecordStartedRpcs(false);
@@ -115,25 +118,44 @@ public InProcessChannelBuilder keepAliveWithoutCalls(boolean enable) {
115118
return this;
116119
}
117120

121+
/**
122+
* Provides a custom scheduled executor service.
123+
*
124+
* <p>It's an optional parameter. If the user has not provided a scheduled executor service when
125+
* the channel is built, the builder will use a static cached thread pool.
126+
*
127+
* @return this
128+
*
129+
* @since 1.11.0
130+
*/
131+
public InProcessChannelBuilder scheduledExecutorService(
132+
ScheduledExecutorService scheduledExecutorService) {
133+
this.scheduledExecutorService =
134+
checkNotNull(scheduledExecutorService, "scheduledExecutorService");
135+
return this;
136+
}
137+
118138
@Override
119139
@Internal
120140
protected ClientTransportFactory buildTransportFactory() {
121-
return new InProcessClientTransportFactory(name);
141+
return new InProcessClientTransportFactory(name, scheduledExecutorService);
122142
}
123143

124144
/**
125145
* Creates InProcess transports. Exposed for internal use, as it should be private.
126146
*/
127-
@Internal
128147
static final class InProcessClientTransportFactory implements ClientTransportFactory {
129148
private final String name;
130-
private final ScheduledExecutorService timerService =
131-
SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
132-
149+
private final ScheduledExecutorService timerService;
150+
private final boolean useSharedTimer;
133151
private boolean closed;
134152

135-
private InProcessClientTransportFactory(String name) {
153+
private InProcessClientTransportFactory(
154+
String name, @Nullable ScheduledExecutorService scheduledExecutorService) {
136155
this.name = name;
156+
useSharedTimer = scheduledExecutorService == null;
157+
timerService = useSharedTimer
158+
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
137159
}
138160

139161
@Override
@@ -156,7 +178,9 @@ public void close() {
156178
return;
157179
}
158180
closed = true;
159-
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timerService);
181+
if (useSharedTimer) {
182+
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timerService);
183+
}
160184
}
161185
}
162186
}

core/src/main/java/io/grpc/inprocess/InProcessServer.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,11 @@
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
2020

21-
import com.google.common.annotations.VisibleForTesting;
2221
import io.grpc.ServerStreamTracer;
2322
import io.grpc.internal.InternalServer;
2423
import io.grpc.internal.ObjectPool;
2524
import io.grpc.internal.ServerListener;
2625
import io.grpc.internal.ServerTransportListener;
27-
import io.grpc.internal.SharedResourceHolder.Resource;
28-
import io.grpc.internal.SharedResourcePool;
2926
import java.io.IOException;
3027
import java.util.Collections;
3128
import java.util.List;
@@ -47,21 +44,14 @@ static InProcessServer findServer(String name) {
4744
private final List<ServerStreamTracer.Factory> streamTracerFactories;
4845
private ServerListener listener;
4946
private boolean shutdown;
50-
/** Expected to be a SharedResourcePool except in testing. */
47+
/** Defaults to be a SharedResourcePool. */
5148
private final ObjectPool<ScheduledExecutorService> schedulerPool;
5249
/**
5350
* Only used to make sure the scheduler has at least one reference. Since child transports can
5451
* outlive this server, they must get their own reference.
5552
*/
5653
private ScheduledExecutorService scheduler;
5754

58-
InProcessServer(
59-
String name, Resource<ScheduledExecutorService> schedulerResource,
60-
List<ServerStreamTracer.Factory> streamTracerFactories) {
61-
this(name, SharedResourcePool.forResource(schedulerResource), streamTracerFactories);
62-
}
63-
64-
@VisibleForTesting
6555
InProcessServer(
6656
String name, ObjectPool<ScheduledExecutorService> schedulerPool,
6757
List<ServerStreamTracer.Factory> streamTracerFactories) {

core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,20 @@
1616

1717
package io.grpc.inprocess;
1818

19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
1921
import com.google.common.base.Preconditions;
2022
import io.grpc.ExperimentalApi;
2123
import io.grpc.ServerStreamTracer;
2224
import io.grpc.internal.AbstractServerImplBuilder;
25+
import io.grpc.internal.FixedObjectPool;
2326
import io.grpc.internal.GrpcUtil;
27+
import io.grpc.internal.ObjectPool;
28+
import io.grpc.internal.SharedResourcePool;
2429
import java.io.File;
2530
import java.util.List;
2631
import java.util.UUID;
32+
import java.util.concurrent.ScheduledExecutorService;
2733
import java.util.concurrent.TimeUnit;
2834

2935
/**
@@ -86,6 +92,8 @@ public static String generateName() {
8692
}
8793

8894
private final String name;
95+
private ObjectPool<ScheduledExecutorService> schedulerPool =
96+
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
8997

9098
private InProcessServerBuilder(String name) {
9199
this.name = Preconditions.checkNotNull(name, "name");
@@ -98,10 +106,27 @@ private InProcessServerBuilder(String name) {
98106
handshakeTimeout(Long.MAX_VALUE, TimeUnit.SECONDS);
99107
}
100108

109+
/**
110+
* Provides a custom scheduled executor service.
111+
*
112+
* <p>It's an optional parameter. If the user has not provided a scheduled executor service when
113+
* the channel is built, the builder will use a static cached thread pool.
114+
*
115+
* @return this
116+
*
117+
* @since 1.11.0
118+
*/
119+
public InProcessServerBuilder scheduledExecutorService(
120+
ScheduledExecutorService scheduledExecutorService) {
121+
schedulerPool = new FixedObjectPool<ScheduledExecutorService>(
122+
checkNotNull(scheduledExecutorService, "scheduledExecutorService"));
123+
return this;
124+
}
125+
101126
@Override
102127
protected InProcessServer buildTransportServer(
103128
List<ServerStreamTracer.Factory> streamTracerFactories) {
104-
return new InProcessServer(name, GrpcUtil.TIMER_SERVICE, streamTracerFactories);
129+
return new InProcessServer(name, schedulerPool, streamTracerFactories);
105130
}
106131

107132
@Override

core/src/main/java/io/grpc/internal/FixedObjectPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
/**
2222
* An object pool that always returns the same instance and does nothing when returning the object.
2323
*/
24-
final class FixedObjectPool<T> implements ObjectPool<T> {
24+
public final class FixedObjectPool<T> implements ObjectPool<T> {
2525
private final T object;
2626

2727
public FixedObjectPool(T object) {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2018, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.inprocess;
18+
19+
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
20+
import static org.junit.Assert.assertSame;
21+
22+
import io.grpc.internal.ClientTransportFactory;
23+
import io.grpc.internal.FakeClock;
24+
import io.grpc.internal.SharedResourceHolder;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import org.junit.Test;
27+
import org.junit.runner.RunWith;
28+
import org.junit.runners.JUnit4;
29+
30+
/**
31+
* Unit tests for {@link InProcessChannelBuilder}.
32+
*/
33+
@RunWith(JUnit4.class)
34+
public class InProcessChannelBuilderTest {
35+
@Test
36+
public void scheduledExecutorService_default() {
37+
InProcessChannelBuilder builder = InProcessChannelBuilder.forName("foo");
38+
ClientTransportFactory clientTransportFactory = builder.buildTransportFactory();
39+
assertSame(
40+
SharedResourceHolder.get(TIMER_SERVICE),
41+
clientTransportFactory.getScheduledExecutorService());
42+
43+
SharedResourceHolder.release(
44+
TIMER_SERVICE, clientTransportFactory.getScheduledExecutorService());
45+
clientTransportFactory.close();
46+
}
47+
48+
@Test
49+
public void scheduledExecutorService_custom() {
50+
InProcessChannelBuilder builder = InProcessChannelBuilder.forName("foo");
51+
ScheduledExecutorService scheduledExecutorService =
52+
new FakeClock().getScheduledExecutorService();
53+
54+
InProcessChannelBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
55+
assertSame(builder, builder1);
56+
57+
ClientTransportFactory clientTransportFactory = builder1.buildTransportFactory();
58+
59+
assertSame(scheduledExecutorService, clientTransportFactory.getScheduledExecutorService());
60+
61+
clientTransportFactory.close();
62+
}
63+
}

core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,18 @@
1616

1717
package io.grpc.inprocess;
1818

19+
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
1920
import static org.junit.Assert.assertFalse;
2021
import static org.junit.Assert.assertNotEquals;
2122
import static org.junit.Assert.assertNotNull;
23+
import static org.junit.Assert.assertSame;
2224

25+
import io.grpc.ServerStreamTracer.Factory;
26+
import io.grpc.internal.FakeClock;
27+
import io.grpc.internal.ObjectPool;
28+
import io.grpc.internal.SharedResourcePool;
29+
import java.util.ArrayList;
30+
import java.util.concurrent.ScheduledExecutorService;
2331
import org.junit.Test;
2432
import org.junit.runner.RunWith;
2533
import org.junit.runners.JUnit4;
@@ -42,4 +50,40 @@ public void generateName() {
4250

4351
assertNotEquals(name1, name2);
4452
}
53+
54+
@Test
55+
public void scheduledExecutorService_default() {
56+
InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
57+
InProcessServer server = builder.buildTransportServer(new ArrayList<Factory>());
58+
59+
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
60+
server.getScheduledExecutorServicePool();
61+
ObjectPool<ScheduledExecutorService> expectedPool =
62+
SharedResourcePool.forResource(TIMER_SERVICE);
63+
64+
ScheduledExecutorService expected = expectedPool.getObject();
65+
ScheduledExecutorService actual = scheduledExecutorServicePool.getObject();
66+
assertSame(expected, actual);
67+
68+
expectedPool.returnObject(expected);
69+
scheduledExecutorServicePool.returnObject(actual);
70+
}
71+
72+
@Test
73+
public void scheduledExecutorService_custom() {
74+
InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
75+
ScheduledExecutorService scheduledExecutorService =
76+
new FakeClock().getScheduledExecutorService();
77+
78+
InProcessServerBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
79+
assertSame(builder, builder1);
80+
81+
InProcessServer server = builder1.buildTransportServer(new ArrayList<Factory>());
82+
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
83+
server.getScheduledExecutorServicePool();
84+
85+
assertSame(scheduledExecutorService, scheduledExecutorServicePool.getObject());
86+
87+
scheduledExecutorServicePool.returnObject(scheduledExecutorService);
88+
}
4589
}

core/src/test/java/io/grpc/inprocess/InProcessServerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.grpc.internal.ServerListener;
2525
import io.grpc.internal.ServerTransport;
2626
import io.grpc.internal.ServerTransportListener;
27+
import io.grpc.internal.SharedResourcePool;
2728
import java.util.Collections;
2829
import java.util.concurrent.ScheduledExecutorService;
2930
import org.junit.Test;
@@ -36,7 +37,7 @@ public class InProcessServerTest {
3637
@Test
3738
public void getPort_notStarted() throws Exception {
3839
InProcessServer s =
39-
new InProcessServer("name", GrpcUtil.TIMER_SERVICE,
40+
new InProcessServer("name", SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
4041
Collections.<ServerStreamTracer.Factory>emptyList());
4142

4243
Truth.assertThat(s.getPort()).isEqualTo(-1);

core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.grpc.internal.GrpcUtil;
2121
import io.grpc.internal.InternalServer;
2222
import io.grpc.internal.ManagedClientTransport;
23+
import io.grpc.internal.SharedResourcePool;
2324
import io.grpc.internal.testing.AbstractTransportTest;
2425
import java.util.List;
2526
import org.junit.runner.RunWith;
@@ -34,7 +35,9 @@ public class InProcessTransportTest extends AbstractTransportTest {
3435

3536
@Override
3637
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
37-
return new InProcessServer(TRANSPORT_NAME, GrpcUtil.TIMER_SERVICE, streamTracerFactories);
38+
return new InProcessServer(
39+
TRANSPORT_NAME,
40+
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), streamTracerFactories);
3841
}
3942

4043
@Override

0 commit comments

Comments
 (0)