35
35
import java .util .concurrent .ExecutorService ;
36
36
import java .util .concurrent .Executors ;
37
37
import java .util .concurrent .Future ;
38
- import java .util .concurrent .ScheduledExecutorService ;
39
- import java .util .concurrent .ScheduledThreadPoolExecutor ;
40
38
import java .util .concurrent .atomic .AtomicInteger ;
41
39
42
40
/**
@@ -46,17 +44,15 @@ final class MessageConsumerImpl implements MessageConsumer {
46
44
47
45
private static final int MAX_QUEUED_CALLBACKS = 100 ;
48
46
// shared scheduled executor, used to schedule pulls
49
- private static final SharedResourceHolder .Resource <ScheduledExecutorService > TIMER =
50
- new SharedResourceHolder .Resource <ScheduledExecutorService >() {
47
+ private static final SharedResourceHolder .Resource <ExecutorService > CONSUMER_EXECUTOR =
48
+ new SharedResourceHolder .Resource <ExecutorService >() {
51
49
@ Override
52
- public ScheduledExecutorService create () {
53
- ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor (1 );
54
- timer .setRemoveOnCancelPolicy (true );
55
- return timer ;
50
+ public ExecutorService create () {
51
+ return Executors .newSingleThreadExecutor ();
56
52
}
57
53
58
54
@ Override
59
- public void close (ScheduledExecutorService instance ) {
55
+ public void close (ExecutorService instance ) {
60
56
instance .shutdown ();
61
57
}
62
58
};
@@ -67,7 +63,7 @@ public void close(ScheduledExecutorService instance) {
67
63
private final AckDeadlineRenewer deadlineRenewer ;
68
64
private final String subscription ;
69
65
private final MessageProcessor messageProcessor ;
70
- private final ScheduledExecutorService timer ;
66
+ private final ExecutorService consumerExecutor ;
71
67
private final ExecutorFactory <ExecutorService > executorFactory ;
72
68
private final ExecutorService executor ;
73
69
private final AtomicInteger queuedCallbacks ;
@@ -192,7 +188,7 @@ private MessageConsumerImpl(Builder builder) {
192
188
this .pubsub = pubsubOptions .service ();
193
189
this .deadlineRenewer = builder .deadlineRenewer ;
194
190
this .queuedCallbacks = new AtomicInteger ();
195
- this .timer = SharedResourceHolder .get (TIMER );
191
+ this .consumerExecutor = SharedResourceHolder .get (CONSUMER_EXECUTOR );
196
192
this .executorFactory =
197
193
builder .executorFactory != null ? builder .executorFactory : new DefaultExecutorFactory ();
198
194
this .executor = executorFactory .get ();
@@ -209,7 +205,7 @@ private void pullIfNeeded() {
209
205
if (closed || scheduledFuture != null || !pullPolicy .shouldPull (queuedCallbacks .get ())) {
210
206
return ;
211
207
}
212
- scheduledFuture = timer .submit (consumerRunnable );
208
+ scheduledFuture = consumerExecutor .submit (consumerRunnable );
213
209
}
214
210
}
215
211
@@ -219,7 +215,7 @@ private void nextPull() {
219
215
scheduledFuture = null ;
220
216
return ;
221
217
}
222
- scheduledFuture = timer .submit (consumerRunnable );
218
+ scheduledFuture = consumerExecutor .submit (consumerRunnable );
223
219
}
224
220
}
225
221
@@ -237,7 +233,7 @@ public void close() {
237
233
pullerFuture .cancel (true );
238
234
}
239
235
}
240
- SharedResourceHolder .release (TIMER , timer );
236
+ SharedResourceHolder .release (CONSUMER_EXECUTOR , consumerExecutor );
241
237
executorFactory .release (executor );
242
238
}
243
239
0 commit comments