Skip to content

Commit 37d03e8

Browse files
authored
Allow user to set socket read timeout option (#1188)
1 parent 606699e commit 37d03e8

File tree

4 files changed

+63
-8
lines changed

4 files changed

+63
-8
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ plugins {
1313
id 'signing'
1414
}
1515

16-
def jarVersion = "2.19.2"
16+
def jarVersion = "2.20.0"
1717

1818
def isRelease = System.getenv("BUILD_EVENT") == "release"
1919
def brn = System.getenv("BRANCH_REF_NAME")

src/main/java/io/nats/client/Options.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ public class Options {
125125
*/
126126
public static final long MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT = 100;
127127

128+
/**
129+
* Constant used for calculating if a socket read timeout is large enough.
130+
*/
131+
public static final long MINIMUM_SOCKET_READ_TIMEOUT_GT_CONNECTION_TIMEOUT = 100;
132+
128133
/**
129134
* Default server ping interval. The client will send a ping to the server on this interval to insure liveness.
130135
* The server may send pings to the client as well, these are handled automatically by the library,
@@ -271,6 +276,11 @@ public class Options {
271276
* {@link Builder#connectionTimeout(Duration) connectionTimeout}.
272277
*/
273278
public static final String PROP_CONNECTION_TIMEOUT = PFX + "timeout";
279+
/**
280+
* Property used to configure a builder from a Properties object. {@value}, see
281+
* {@link Builder#socketReadTimeoutMillis(int) socketReadTimeoutMillis}.
282+
*/
283+
public static final String PROP_SOCKET_READ_TIMEOUT_MS = PFX + "socket.read.timeout.ms";
274284
/**
275285
* Property used to configure a builder from a Properties object. {@value}, see
276286
* {@link Builder#socketWriteTimeout(long) socketWriteTimeout}.
@@ -591,6 +601,7 @@ public class Options {
591601
private final Duration reconnectJitter;
592602
private final Duration reconnectJitterTls;
593603
private final Duration connectionTimeout;
604+
private final int socketReadTimeoutMillis;
594605
private final Duration socketWriteTimeout;
595606
private final int socketSoLinger;
596607
private final Duration pingInterval;
@@ -704,6 +715,7 @@ public static class Builder {
704715
private Duration reconnectJitter = DEFAULT_RECONNECT_JITTER;
705716
private Duration reconnectJitterTls = DEFAULT_RECONNECT_JITTER_TLS;
706717
private Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
718+
private int socketReadTimeoutMillis = 0;
707719
private Duration socketWriteTimeout = DEFAULT_SOCKET_WRITE_TIMEOUT;
708720
private int socketSoLinger = -1;
709721
private Duration pingInterval = DEFAULT_PING_INTERVAL;
@@ -840,6 +852,7 @@ public Builder properties(Properties props) {
840852
durationProperty(props, PROP_RECONNECT_JITTER_TLS, DEFAULT_RECONNECT_JITTER_TLS, d -> this.reconnectJitterTls = d);
841853
longProperty(props, PROP_RECONNECT_BUF_SIZE, DEFAULT_RECONNECT_BUF_SIZE, l -> this.reconnectBufferSize = l);
842854
durationProperty(props, PROP_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, d -> this.connectionTimeout = d);
855+
intProperty(props, PROP_SOCKET_READ_TIMEOUT_MS, -1, i -> this.socketReadTimeoutMillis = i);
843856
durationProperty(props, PROP_SOCKET_WRITE_TIMEOUT, DEFAULT_SOCKET_WRITE_TIMEOUT, d -> this.socketWriteTimeout = d);
844857
intProperty(props, PROP_SOCKET_SO_LINGER, -1, i -> socketSoLinger = i);
845858

@@ -1273,6 +1286,16 @@ public Builder connectionTimeout(long connectionTimeoutMillis) {
12731286
return this;
12741287
}
12751288

1289+
/**
1290+
* Set the timeout to use around socket reads
1291+
* @param socketReadTimeoutMillis the timeout milliseconds
1292+
* @return the Builder for chaining
1293+
*/
1294+
public Builder socketReadTimeoutMillis(int socketReadTimeoutMillis) {
1295+
this.socketReadTimeoutMillis = socketReadTimeoutMillis;
1296+
return this;
1297+
}
1298+
12761299
/**
12771300
* Set the timeout to use around socket writes
12781301
* @param socketWriteTimeoutMillis the timeout milliseconds
@@ -1285,7 +1308,7 @@ public Builder socketWriteTimeout(long socketWriteTimeoutMillis) {
12851308

12861309
/**
12871310
* Set the timeout to use around socket writes
1288-
* @param socketWriteTimeout the timeout milliseconds
1311+
* @param socketWriteTimeout the timeout duration
12891312
* @return the Builder for chaining
12901313
*/
12911314
public Builder socketWriteTimeout(Duration socketWriteTimeout) {
@@ -1311,7 +1334,7 @@ public Builder socketSoLinger(int socketSoLinger) {
13111334
/**
13121335
* Set the interval between attempts to pings the server. These pings are automated,
13131336
* and capped by {@link #maxPingsOut(int) maxPingsOut()}. As of 2.4.4 the library
1314-
* may way up to 2 * time to send a ping. Incoming traffic from the server can postpone
1337+
* may wait up to 2 * time to send a ping. Incoming traffic from the server can postpone
13151338
* the next ping to avoid pings taking up bandwidth during busy messaging.
13161339
* Keep in mind that a ping requires a round trip to the server. Setting this value to a small
13171340
* number can result in quick failures due to maxPingsOut being reached, these failures will
@@ -1323,7 +1346,7 @@ public Builder socketSoLinger(int socketSoLinger) {
13231346
* @return the Builder for chaining
13241347
*/
13251348
public Builder pingInterval(Duration time) {
1326-
this.pingInterval = time;
1349+
this.pingInterval = time == null ? DEFAULT_PING_INTERVAL : time;
13271350
return this;
13281351
}
13291352

@@ -1772,6 +1795,15 @@ else if (useDefaultTls) {
17721795
new DefaultThreadFactory(threadPrefix));
17731796
}
17741797

1798+
if (socketReadTimeoutMillis > 0) {
1799+
long srtMin = pingInterval.toMillis() + MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT;
1800+
if (socketReadTimeoutMillis < srtMin) {
1801+
throw new IllegalStateException("Socket Read Timeout must be at least "
1802+
+ MINIMUM_SOCKET_READ_TIMEOUT_GT_CONNECTION_TIMEOUT
1803+
+ " milliseconds greater than the Ping Interval");
1804+
}
1805+
}
1806+
17751807
if (socketWriteTimeout == null || socketWriteTimeout.toMillis() < 1) {
17761808
socketWriteTimeout = null;
17771809
}
@@ -1833,6 +1865,7 @@ public Builder(Options o) {
18331865
this.reconnectJitter = o.reconnectJitter;
18341866
this.reconnectJitterTls = o.reconnectJitterTls;
18351867
this.connectionTimeout = o.connectionTimeout;
1868+
this.socketReadTimeoutMillis = o.socketReadTimeoutMillis;
18361869
this.socketWriteTimeout = o.socketWriteTimeout;
18371870
this.socketSoLinger = o.socketSoLinger;
18381871
this.pingInterval = o.pingInterval;
@@ -1896,6 +1929,7 @@ private Options(Builder b) {
18961929
this.reconnectJitter = b.reconnectJitter;
18971930
this.reconnectJitterTls = b.reconnectJitterTls;
18981931
this.connectionTimeout = b.connectionTimeout;
1932+
this.socketReadTimeoutMillis = b.socketReadTimeoutMillis;
18991933
this.socketWriteTimeout = b.socketWriteTimeout;
19001934
this.socketSoLinger = b.socketSoLinger;
19011935
this.pingInterval = b.pingInterval;
@@ -2213,7 +2247,14 @@ public Duration getConnectionTimeout() {
22132247
}
22142248

22152249
/**
2216-
* @return the socketWriteTimeout, see {@link Builder#socketWriteTimeout(long) socketWriteTimeout()} in the builder doc
2250+
* @return the socketReadTimeoutMillis, see {@link Builder#socketReadTimeoutMillis(int) socketReadTimeoutMillis} in the builder doc
2251+
*/
2252+
public int getSocketReadTimeoutMillis() {
2253+
return socketReadTimeoutMillis;
2254+
}
2255+
2256+
/**
2257+
* @return the socketWriteTimeout, see {@link Builder#socketWriteTimeout(long) socketWriteTimeout} in the builder doc
22172258
*/
22182259
public Duration getSocketWriteTimeout() {
22192260
return socketWriteTimeout;

src/main/java/io/nats/client/impl/SocketDataPort.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws
8585
if (soLinger > -1) {
8686
socket.setSoLinger(true, soLinger);
8787
}
88+
if (options.getSocketReadTimeoutMillis() > 0) {
89+
socket.setSoTimeout(options.getSocketReadTimeoutMillis());
90+
}
8891

8992
if (isWebsocketScheme(nuri.getScheme())) {
9093
if (SECURE_WEBSOCKET_PROTOCOL.equalsIgnoreCase(nuri.getScheme())) {

src/test/java/io/nats/client/OptionsTests.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import java.util.concurrent.Future;
3838
import java.util.concurrent.TimeUnit;
3939

40-
import static io.nats.client.Options.DEFAULT_MAX_MESSAGES_IN_OUTGOING_QUEUE;
40+
import static io.nats.client.Options.*;
4141
import static io.nats.client.support.Encoding.base64UrlEncodeToString;
4242
import static io.nats.client.support.NatsConstants.DEFAULT_PORT;
4343
import static org.junit.jupiter.api.Assertions.*;
@@ -93,7 +93,7 @@ private static void _testDefaultOptions(Options o) {
9393

9494
assertEquals(Options.DEFAULT_RECONNECT_WAIT, o.getReconnectWait(), "default reconnect wait");
9595
assertEquals(Options.DEFAULT_CONNECTION_TIMEOUT, o.getConnectionTimeout(), "default connection timeout");
96-
assertEquals(Options.DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval");
96+
assertEquals(DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval");
9797
assertEquals(Options.DEFAULT_REQUEST_CLEANUP_INTERVAL, o.getRequestCleanupInterval(),
9898
"default cleanup interval");
9999

@@ -599,7 +599,7 @@ private static void _testDefaultPropertyIntOptions(Options o) {
599599
assertEquals(Options.DEFAULT_MAX_CONTROL_LINE, o.getMaxControlLine(), "default max control line");
600600
assertEquals(Options.DEFAULT_RECONNECT_WAIT, o.getReconnectWait(), "default reconnect wait");
601601
assertEquals(Options.DEFAULT_CONNECTION_TIMEOUT, o.getConnectionTimeout(), "default connection timeout");
602-
assertEquals(Options.DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval");
602+
assertEquals(DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval");
603603
assertEquals(Options.DEFAULT_REQUEST_CLEANUP_INTERVAL, o.getRequestCleanupInterval(),
604604
"default cleanup interval");
605605
assertEquals(DEFAULT_MAX_MESSAGES_IN_OUTGOING_QUEUE, o.getMaxMessagesInOutgoingQueue(),
@@ -800,6 +800,17 @@ public void testDefaultDataPort() {
800800
assertEquals(SocketDataPortWithWriteTimeout.class.getCanonicalName(), dataPort.getClass().getCanonicalName(), "new default dataPort");
801801
}
802802

803+
@Test
804+
public void testTimeoutValidations() {
805+
assertThrows(IllegalStateException.class, () -> Options.builder()
806+
.socketReadTimeoutMillis((int)DEFAULT_PING_INTERVAL.toMillis())
807+
.build());
808+
809+
assertThrows(IllegalStateException.class, () -> Options.builder()
810+
.socketWriteTimeout(DEFAULT_CONNECTION_TIMEOUT)
811+
.build());
812+
}
813+
803814
@Test
804815
public void testPropertyDataPortType() {
805816
Properties props = new Properties();

0 commit comments

Comments
 (0)