Skip to content

Commit 7833687

Browse files
committed
Trying to ensure no blocking close on force reconnect from socket write timeout
1 parent 5f7fa40 commit 7833687

File tree

3 files changed

+26
-16
lines changed

3 files changed

+26
-16
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -293,18 +293,21 @@ void forceReconnectImpl() throws IOException, InterruptedException {
293293
dataPortFuture.cancel(true);
294294
dataPortFuture = null;
295295
}
296+
297+
// close the data port as a task so as not to block reconnect
296298
if (dataPort != null) {
297-
try {
298-
dataPort.close();
299-
}
300-
catch (IOException ignore) {}
301-
finally {
302-
dataPort = null;
303-
}
299+
final DataPort closeMe = dataPort;
300+
dataPort = null;
301+
executor.submit(() -> {
302+
try {
303+
closeMe.close();
304+
}
305+
catch (IOException ignore) {}
306+
});
304307
}
305308

306309
// stop i/o
307-
reader.stop();
310+
reader.stop(false);
308311
writer.stop();
309312

310313
// new reader/writer

src/main/java/io/nats/client/impl/NatsConnectionReader.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,17 @@ void start(Future<DataPort> dataPortFuture) {
9494
this.stopped = connection.getExecutor().submit(this, Boolean.TRUE);
9595
}
9696

97+
Future<Boolean> stop() {
98+
return stop(true);
99+
}
100+
97101
// May be called several times on an error.
98102
// Returns a future that is completed when the thread completes, not when this
99103
// method does.
100-
Future<Boolean> stop() {
104+
Future<Boolean> stop(boolean shutdownDataPort) {
101105
if (running.get()) {
102106
running.set(false);
103-
if (dataPort != null) {
107+
if (shutdownDataPort && dataPort != null) {
104108
try {
105109
dataPort.shutdownInput();
106110
}

src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,15 @@ public void run() {
3838
if (System.nanoTime() > writeMustBeDoneBy) {
3939
writeWatcherTimer.cancel(); // we don't need to repeat this
4040
connection.executeCallback((c, el) -> el.socketWriteTimeout(c));
41-
connection.getExecutor().submit(() -> {
42-
try {
43-
connection.forceReconnect();
44-
}
45-
catch (IOException | InterruptedException ignore) {}
46-
});
41+
try {
42+
connection.forceReconnect();
43+
}
44+
catch (IOException e) {
45+
// retry maybe? forceReconnect
46+
}
47+
catch (InterruptedException e) {
48+
Thread.currentThread().interrupt();
49+
}
4750
}
4851
}
4952
}

0 commit comments

Comments
 (0)