Skip to content
This repository was archived by the owner on Dec 3, 2023. It is now read-only.

Commit 827a94d

Browse files
committed
Add base service option classes for gRPC and HTTP services (#1011)
* Add base service option classes for gRPC and HTTP services * Rename ExecutorProvider to ExecutorFactory, refactor shutdown and serialization
1 parent bedad4a commit 827a94d

File tree

6 files changed

+869
-159
lines changed

6 files changed

+869
-159
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud;
18+
19+
import static com.google.common.base.MoreObjects.firstNonNull;
20+
21+
import com.google.cloud.spi.ServiceRpcFactory;
22+
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.base.Preconditions;
24+
25+
import io.grpc.internal.SharedResourceHolder;
26+
import io.grpc.internal.SharedResourceHolder.Resource;
27+
28+
import java.io.IOException;
29+
import java.io.ObjectInputStream;
30+
import java.util.Objects;
31+
import java.util.concurrent.ScheduledExecutorService;
32+
import java.util.concurrent.ScheduledThreadPoolExecutor;
33+
import java.util.concurrent.TimeUnit;
34+
35+
/**
36+
* Abstract class representing service options for those services that use gRPC as the transport
37+
* layer.
38+
*
39+
* @param <ServiceT> the service subclass
40+
* @param <ServiceRpcT> the spi-layer class corresponding to the service
41+
* @param <OptionsT> the {@code ServiceOptions} subclass corresponding to the service
42+
*/
43+
public abstract class GrpcServiceOptions<ServiceT extends Service<OptionsT>, ServiceRpcT,
44+
OptionsT extends GrpcServiceOptions<ServiceT, ServiceRpcT, OptionsT>>
45+
extends ServiceOptions<ServiceT, ServiceRpcT, OptionsT> {
46+
47+
private static final long serialVersionUID = 6415982522610509549L;
48+
private final String executorFactoryClassName;
49+
private final int initialTimeout;
50+
private final double timeoutMultiplier;
51+
private final int maxTimeout;
52+
53+
private transient ExecutorFactory executorFactory;
54+
55+
/**
56+
* Shared thread pool executor.
57+
*/
58+
private static final Resource<ScheduledExecutorService> EXECUTOR =
59+
new Resource<ScheduledExecutorService>() {
60+
@Override
61+
public ScheduledExecutorService create() {
62+
ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(8);
63+
service.setKeepAliveTime(5, TimeUnit.SECONDS);
64+
service.allowCoreThreadTimeOut(true);
65+
service.setRemoveOnCancelPolicy(true);
66+
return service;
67+
}
68+
69+
@Override
70+
public void close(ScheduledExecutorService instance) {
71+
instance.shutdown();
72+
}
73+
};
74+
75+
/**
76+
* An interface for {@link ScheduledExecutorService} factories. Implementations of this interface
77+
* can be used to provide an user-defined scheduled executor to execute requests. Any
78+
* implementation of this interface must override the {@code get()} method to return the desired
79+
* executor. The {@code release(executor)} method should be overriden to free resources used by
80+
* the executor (if needed) according to application's logic.
81+
*
82+
* <p>Implementation must provide a public no-arg constructor. Loading of a factory implementation
83+
* is done via {@link java.util.ServiceLoader}.
84+
*/
85+
public interface ExecutorFactory {
86+
87+
/**
88+
* Gets a scheduled executor service instance.
89+
*/
90+
ScheduledExecutorService get();
91+
92+
/**
93+
* Releases resources used by the executor and possibly shuts it down.
94+
*/
95+
void release(ScheduledExecutorService executor);
96+
}
97+
98+
@VisibleForTesting
99+
static class DefaultExecutorFactory implements ExecutorFactory {
100+
101+
private static final DefaultExecutorFactory INSTANCE = new DefaultExecutorFactory();
102+
103+
@Override
104+
public ScheduledExecutorService get() {
105+
return SharedResourceHolder.get(EXECUTOR);
106+
}
107+
108+
@Override
109+
public synchronized void release(ScheduledExecutorService executor) {
110+
SharedResourceHolder.release(EXECUTOR, executor);
111+
}
112+
}
113+
114+
/**
115+
* Builder for {@code GrpcServiceOptions}.
116+
*
117+
* @param <ServiceT> the service subclass
118+
* @param <ServiceRpcT> the spi-layer class corresponding to the service
119+
* @param <OptionsT> the {@code GrpcServiceOptions} subclass corresponding to the service
120+
* @param <B> the {@code ServiceOptions} builder
121+
*/
122+
protected abstract static class Builder<ServiceT extends Service<OptionsT>, ServiceRpcT,
123+
OptionsT extends GrpcServiceOptions<ServiceT, ServiceRpcT, OptionsT>,
124+
B extends Builder<ServiceT, ServiceRpcT, OptionsT, B>>
125+
extends ServiceOptions.Builder<ServiceT, ServiceRpcT, OptionsT, B> {
126+
127+
private ExecutorFactory executorFactory;
128+
private int initialTimeout = 20_000;
129+
private double timeoutMultiplier = 1.5;
130+
private int maxTimeout = 100_000;
131+
132+
protected Builder() {}
133+
134+
protected Builder(GrpcServiceOptions<ServiceT, ServiceRpcT, OptionsT> options) {
135+
super(options);
136+
executorFactory = options.executorFactory;
137+
initialTimeout = options.initialTimeout;
138+
timeoutMultiplier = options.timeoutMultiplier;
139+
maxTimeout = options.maxTimeout;
140+
}
141+
142+
@Override
143+
protected abstract GrpcServiceOptions<ServiceT, ServiceRpcT, OptionsT> build();
144+
145+
/**
146+
* Sets the scheduled executor factory. This method can be used to provide an user-defined
147+
* scheduled executor to execute requests.
148+
*
149+
* @return the builder
150+
*/
151+
public B executorFactory(ExecutorFactory executorFactory) {
152+
this.executorFactory = executorFactory;
153+
return self();
154+
}
155+
156+
/**
157+
* Sets the timeout for the initial RPC, in milliseconds. Subsequent calls will use this value
158+
* adjusted according to {@link #timeoutMultiplier(double)}. Default value is 20000.
159+
*
160+
* @throws IllegalArgumentException if the provided timeout is &lt; 0
161+
* @return the builder
162+
*/
163+
public B initialTimeout(int initialTimeout) {
164+
Preconditions.checkArgument(initialTimeout > 0, "Initial timeout must be > 0");
165+
this.initialTimeout = initialTimeout;
166+
return self();
167+
}
168+
169+
/**
170+
* Sets the timeout multiplier. This value is used to compute the timeout for a retried RPC.
171+
* Timeout is computed as {@code timeoutMultiplier * previousTimeout}. Default value is 1.5.
172+
*
173+
* @throws IllegalArgumentException if the provided timeout multiplier is &lt; 0
174+
* @return the builder
175+
*/
176+
public B timeoutMultiplier(double timeoutMultiplier) {
177+
Preconditions.checkArgument(timeoutMultiplier >= 1.0, "Timeout multiplier must be >= 1");
178+
this.timeoutMultiplier = timeoutMultiplier;
179+
return self();
180+
}
181+
182+
/**
183+
* Sets the maximum timeout for a RPC call, in milliseconds. Default value is 100000. If
184+
* {@code maxTimeout} is lower than the initial timeout the {@link #initialTimeout(int)} value
185+
* is used instead.
186+
*
187+
* @return the builder
188+
*/
189+
public B maxTimeout(int maxTimeout) {
190+
this.maxTimeout = maxTimeout;
191+
return self();
192+
}
193+
}
194+
195+
protected GrpcServiceOptions(
196+
Class<? extends ServiceFactory<ServiceT, OptionsT>> serviceFactoryClass,
197+
Class<? extends ServiceRpcFactory<ServiceRpcT, OptionsT>> rpcFactoryClass, Builder<ServiceT,
198+
ServiceRpcT, OptionsT, ?> builder) {
199+
super(serviceFactoryClass, rpcFactoryClass, builder);
200+
executorFactory = firstNonNull(builder.executorFactory,
201+
getFromServiceLoader(ExecutorFactory.class, DefaultExecutorFactory.INSTANCE));
202+
executorFactoryClassName = executorFactory.getClass().getName();
203+
initialTimeout = builder.initialTimeout;
204+
timeoutMultiplier = builder.timeoutMultiplier;
205+
maxTimeout = builder.maxTimeout <= initialTimeout ? initialTimeout : builder.maxTimeout;
206+
}
207+
208+
/**
209+
* Returns a scheduled executor service provider.
210+
*/
211+
protected ExecutorFactory executorFactory() {
212+
return executorFactory;
213+
}
214+
215+
/**
216+
* Returns the timeout for the initial RPC, in milliseconds. Subsequent calls will use this value
217+
* adjusted according to {@link #timeoutMultiplier()}. Default value is 20000.
218+
*/
219+
public int initialTimeout() {
220+
return initialTimeout;
221+
}
222+
223+
/**
224+
* Returns the timeout multiplier. This values is used to compute the timeout for a RPC. Timeout
225+
* is computed as {@code timeoutMultiplier * previousTimeout}. Default value is 1.5.
226+
*/
227+
public double timeoutMultiplier() {
228+
return timeoutMultiplier;
229+
}
230+
231+
/**
232+
* Returns the maximum timeout for a RPC call, in milliseconds. Default value is 100000.
233+
*/
234+
public int maxTimeout() {
235+
return maxTimeout;
236+
}
237+
238+
@Override
239+
protected int baseHashCode() {
240+
return Objects.hash(super.baseHashCode(), executorFactoryClassName, initialTimeout,
241+
timeoutMultiplier, maxTimeout);
242+
}
243+
244+
protected boolean baseEquals(GrpcServiceOptions<?, ?, ?> other) {
245+
return super.baseEquals(other)
246+
&& Objects.equals(executorFactoryClassName, other.executorFactoryClassName)
247+
&& Objects.equals(initialTimeout, other.initialTimeout)
248+
&& Objects.equals(timeoutMultiplier, other.timeoutMultiplier)
249+
&& Objects.equals(maxTimeout, other.maxTimeout);
250+
}
251+
252+
private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
253+
input.defaultReadObject();
254+
executorFactory = newInstance(executorFactoryClassName);
255+
}
256+
}

0 commit comments

Comments
 (0)