Skip to content

Commit ddb4b78

Browse files
During forceReconnect, ensure reader/writer is stopped before continuing (#1203)
1 parent 9ca3a92 commit ddb4b78

File tree

1 file changed

+23
-9
lines changed

1 file changed

+23
-9
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -325,8 +325,16 @@ void forceReconnectImpl(ForceReconnectOptions options) throws InterruptedExcepti
325325
}
326326

327327
// stop i/o
328-
reader.stop(false);
329-
writer.stop();
328+
try {
329+
this.reader.stop(false).get(10, TimeUnit.SECONDS);
330+
} catch (Exception ex) {
331+
processException(ex);
332+
}
333+
try {
334+
this.writer.stop().get(10, TimeUnit.SECONDS);
335+
} catch (Exception ex) {
336+
processException(ex);
337+
}
330338

331339
// new reader/writer
332340
reader = new NatsConnectionReader(this);
@@ -704,13 +712,19 @@ void handleCommunicationIssue(Exception io) {
704712
// Spawn a thread so we don't have timing issues with
705713
// waiting on read/write threads
706714
executor.submit(() -> {
707-
try {
708-
// any issue that brings us here is pretty serious
709-
// so we are comfortable forcing the close
710-
this.closeSocket(true, true);
711-
} catch (InterruptedException e) {
712-
processException(e);
713-
Thread.currentThread().interrupt();
715+
if (!tryingToConnect.get()) {
716+
try {
717+
tryingToConnect.set(true);
718+
719+
// any issue that brings us here is pretty serious
720+
// so we are comfortable forcing the close
721+
this.closeSocket(true, true);
722+
} catch (InterruptedException e) {
723+
processException(e);
724+
Thread.currentThread().interrupt();
725+
} finally {
726+
tryingToConnect.set(false);
727+
}
714728
}
715729
});
716730
}

0 commit comments

Comments
 (0)