17
17
package io .grpc .cronet ;
18
18
19
19
import static com .google .common .base .Preconditions .checkArgument ;
20
+ import static com .google .common .base .Preconditions .checkNotNull ;
20
21
import static io .grpc .internal .GrpcUtil .DEFAULT_MAX_MESSAGE_SIZE ;
21
22
22
23
import com .google .common .annotations .VisibleForTesting ;
@@ -73,6 +74,9 @@ public static CronetChannelBuilder forAddress(String name, int port) {
73
74
throw new UnsupportedOperationException ("call forAddress(String, int, CronetEngine) instead" );
74
75
}
75
76
77
+ @ Nullable
78
+ private ScheduledExecutorService scheduledExecutorService ;
79
+
76
80
private final CronetEngine cronetEngine ;
77
81
78
82
private boolean alwaysUsePut = false ;
@@ -161,12 +165,29 @@ public final CronetChannelBuilder setTrafficStatsUid(int uid) {
161
165
return this ;
162
166
}
163
167
168
+ /**
169
+ * Provides a custom scheduled executor service.
170
+ *
171
+ * <p>It's an optional parameter. If the user has not provided a scheduled executor service when
172
+ * the channel is built, the builder will use a static cached thread pool.
173
+ *
174
+ * @return this
175
+ *
176
+ * @since 1.12.0
177
+ */
178
+ public final CronetChannelBuilder scheduledExecutorService (
179
+ ScheduledExecutorService scheduledExecutorService ) {
180
+ this .scheduledExecutorService =
181
+ checkNotNull (scheduledExecutorService , "scheduledExecutorService" );
182
+ return this ;
183
+ }
184
+
164
185
@ Override
165
186
protected final ClientTransportFactory buildTransportFactory () {
166
187
return new CronetTransportFactory (
167
188
new TaggingStreamFactory (
168
189
cronetEngine , trafficStatsTagSet , trafficStatsTag , trafficStatsUidSet , trafficStatsUid ),
169
- MoreExecutors .directExecutor (),
190
+ MoreExecutors .directExecutor (), scheduledExecutorService ,
170
191
maxMessageSize ,
171
192
alwaysUsePut ,
172
193
transportTracerFactory .create ());
@@ -180,20 +201,24 @@ protected Attributes getNameResolverParams() {
180
201
181
202
@ VisibleForTesting
182
203
static class CronetTransportFactory implements ClientTransportFactory {
183
- private final ScheduledExecutorService timeoutService =
184
- SharedResourceHolder .get (GrpcUtil .TIMER_SERVICE );
204
+ private final ScheduledExecutorService timeoutService ;
185
205
private final Executor executor ;
186
206
private final int maxMessageSize ;
187
207
private final boolean alwaysUsePut ;
188
208
private final StreamBuilderFactory streamFactory ;
189
209
private final TransportTracer transportTracer ;
210
+ private final boolean usingSharedScheduler ;
190
211
191
212
private CronetTransportFactory (
192
213
StreamBuilderFactory streamFactory ,
193
214
Executor executor ,
215
+ @ Nullable ScheduledExecutorService timeoutService ,
194
216
int maxMessageSize ,
195
217
boolean alwaysUsePut ,
196
218
TransportTracer transportTracer ) {
219
+ usingSharedScheduler = timeoutService == null ;
220
+ this .timeoutService = usingSharedScheduler
221
+ ? SharedResourceHolder .get (GrpcUtil .TIMER_SERVICE ) : timeoutService ;
197
222
this .maxMessageSize = maxMessageSize ;
198
223
this .alwaysUsePut = alwaysUsePut ;
199
224
this .streamFactory = streamFactory ;
@@ -216,7 +241,9 @@ public ScheduledExecutorService getScheduledExecutorService() {
216
241
217
242
@ Override
218
243
public void close () {
219
- SharedResourceHolder .release (GrpcUtil .TIMER_SERVICE , timeoutService );
244
+ if (usingSharedScheduler ) {
245
+ SharedResourceHolder .release (GrpcUtil .TIMER_SERVICE , timeoutService );
246
+ }
220
247
}
221
248
}
222
249
0 commit comments