Skip to content

Commit a76dd02

Browse files
krzysiekigryscottf
andauthored
feat(nats-connection): implement named executor thread factories (#1254)
* feat(nats-connection): implement named executor thread factories * Refactor executor creation to use configurable thread factories PR#1254 * Fix executor tests PR#1254 --------- Co-authored-by: Scott Fauerbach <[email protected]>
1 parent a7f40d8 commit a76dd02

File tree

3 files changed

+79
-6
lines changed

3 files changed

+79
-6
lines changed

src/main/java/io/nats/client/Options.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.*;
3939
import java.util.concurrent.*;
4040
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.function.Supplier;
4142

4243
import static io.nats.client.support.Encoding.*;
4344
import static io.nats.client.support.NatsConstants.*;
@@ -226,6 +227,11 @@ public class Options {
226227
*/
227228
public static final boolean DEFAULT_DISCARD_MESSAGES_WHEN_OUTGOING_QUEUE_FULL = false;
228229

230+
/**
231+
* Default supplier for creating a single-threaded executor service.
232+
*/
233+
public static final Supplier<ExecutorService> DEFAULT_SINGLE_THREAD_EXECUTOR = Executors::newSingleThreadExecutor;
234+
229235
// ----------------------------------------------------------------------------------------------------
230236
// ENVIRONMENT PROPERTIES
231237
// ----------------------------------------------------------------------------------------------------
@@ -644,6 +650,8 @@ public class Options {
644650
private final boolean traceConnection;
645651

646652
private final ExecutorService executor;
653+
private final ThreadFactory connectThreadFactory;
654+
private final ThreadFactory callbackThreadFactory;
647655
private final ServerPool serverPool;
648656
private final DispatcherFactory dispatcherFactory;
649657

@@ -759,6 +767,8 @@ public static class Builder {
759767
private StatisticsCollector statisticsCollector = null;
760768
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
761769
private ExecutorService executor;
770+
private ThreadFactory connectThreadFactory;
771+
private ThreadFactory callbackThreadFactory;
762772
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
763773
private Proxy proxy;
764774

@@ -1553,6 +1563,28 @@ public Builder executor(ExecutorService executor) {
15531563
return this;
15541564
}
15551565

1566+
/**
1567+
* Sets custom thread factory for the executor service
1568+
*
1569+
* @param threadFactory the thread factory to use for the executor service
1570+
* @return the Builder for chaining
1571+
*/
1572+
public Builder connectThreadFactory(ThreadFactory threadFactory) {
1573+
this.connectThreadFactory = threadFactory;
1574+
return this;
1575+
}
1576+
1577+
/**
1578+
* Sets custom thread factory for the executor service
1579+
*
1580+
* @param threadFactory the thread factory to use for the executor service
1581+
* @return the Builder for chaining
1582+
*/
1583+
public Builder callbackThreadFactory(ThreadFactory threadFactory) {
1584+
this.callbackThreadFactory = threadFactory;
1585+
return this;
1586+
}
1587+
15561588
/**
15571589
* Add an HttpRequest interceptor which can be used to modify the HTTP request when using websockets
15581590
*
@@ -1914,6 +1946,8 @@ public Builder(Options o) {
19141946
this.dataPortType = o.dataPortType;
19151947
this.trackAdvancedStats = o.trackAdvancedStats;
19161948
this.executor = o.executor;
1949+
this.callbackThreadFactory = o.callbackThreadFactory;
1950+
this.connectThreadFactory = o.connectThreadFactory;
19171951
this.httpRequestInterceptors = o.httpRequestInterceptors;
19181952
this.proxy = o.proxy;
19191953

@@ -1979,6 +2013,8 @@ private Options(Builder b) {
19792013
this.dataPortType = b.dataPortType;
19802014
this.trackAdvancedStats = b.trackAdvancedStats;
19812015
this.executor = b.executor;
2016+
this.callbackThreadFactory = b.callbackThreadFactory;
2017+
this.connectThreadFactory = b.connectThreadFactory;
19822018
this.httpRequestInterceptors = b.httpRequestInterceptors;
19832019
this.proxy = b.proxy;
19842020

@@ -2002,6 +2038,22 @@ public ExecutorService getExecutor() {
20022038
return this.executor;
20032039
}
20042040

2041+
/**
2042+
* @return the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
2043+
*/
2044+
public ExecutorService getCallbackExecutor() {
2045+
return this.callbackThreadFactory == null ?
2046+
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
2047+
}
2048+
2049+
/**
2050+
* @return the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
2051+
*/
2052+
public ExecutorService getConnectExecutor() {
2053+
return this.connectThreadFactory == null ?
2054+
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
2055+
}
2056+
20052057
/**
20062058
* @return the list of HttpRequest interceptors.
20072059
*/

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ class NatsConnection implements Connection {
157157

158158
timeTraceLogger.trace("creating executors");
159159
this.executor = options.getExecutor();
160-
this.callbackRunner = Executors.newSingleThreadExecutor();
161-
this.connectExecutor = Executors.newSingleThreadExecutor();
160+
this.callbackRunner = options.getCallbackExecutor();
161+
this.connectExecutor = options.getConnectExecutor();
162162

163163
timeTraceLogger.trace("creating reader and writer");
164164
this.reader = new NatsConnectionReader(this);

src/test/java/io/nats/client/OptionsTests.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@
3232
import java.nio.charset.StandardCharsets;
3333
import java.time.Duration;
3434
import java.util.*;
35-
import java.util.concurrent.ExecutorService;
36-
import java.util.concurrent.Executors;
37-
import java.util.concurrent.Future;
38-
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.*;
3936

4037
import static io.nats.client.Options.*;
4138
import static io.nats.client.support.Encoding.base64UrlEncodeToString;
@@ -980,6 +977,30 @@ public void testDefaultExecutor() throws Exception {
980977
assertTrue(name.startsWith(Options.DEFAULT_THREAD_NAME_PREFIX));
981978
}
982979

980+
@Test
981+
public void testCallbackExecutor() throws ExecutionException, InterruptedException, TimeoutException {
982+
ThreadFactory threadFactory = r -> new Thread(r, "test");
983+
Options options = new Options.Builder()
984+
.callbackThreadFactory(threadFactory)
985+
.build();
986+
Future<?> callbackFuture = options.getCallbackExecutor().submit(() -> {
987+
assertEquals("test", Thread.currentThread().getName());
988+
});
989+
callbackFuture.get(5, TimeUnit.SECONDS);
990+
}
991+
992+
@Test
993+
public void testConnectExecutor() throws ExecutionException, InterruptedException, TimeoutException {
994+
ThreadFactory threadFactory = r -> new Thread(r, "test");
995+
Options options = new Options.Builder()
996+
.connectThreadFactory(threadFactory)
997+
.build();
998+
Future<?> connectFuture = options.getConnectExecutor().submit(() -> {
999+
assertEquals("test", Thread.currentThread().getName());
1000+
});
1001+
connectFuture.get(5, TimeUnit.SECONDS);
1002+
}
1003+
9831004
String[] schemes = new String[] { "NATS", "unk", "tls", "opentls", "ws", "wss", "nats"};
9841005
boolean[] secures = new boolean[] { false, false, true, true, false, true, false};
9851006
boolean[] wses = new boolean[] { false, false, false, false, true, true, false};

0 commit comments

Comments
 (0)