Skip to content

Commit 17c4779

Browse files
committed
handleCommunicationIssue uses forceClose
1 parent aa60105 commit 17c4779

File tree

2 files changed

+26
-21
lines changed

2 files changed

+26
-21
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,9 @@ public void run() {
628628
} catch (Exception exp) {
629629
processException(exp);
630630
try {
631-
this.closeSocket(false);
631+
// allow force reconnect since this is pretty exceptional,
632+
// a connection failure while trying to connect
633+
this.closeSocket(false, true);
632634
} catch (InterruptedException e) {
633635
processException(e);
634636
}
@@ -691,7 +693,9 @@ void handleCommunicationIssue(Exception io) {
691693
// waiting on read/write threads
692694
executor.submit(() -> {
693695
try {
694-
this.closeSocket(true);
696+
// any issue that brings us here is pretty serious
697+
// so we are comfortable forcing the close
698+
this.closeSocket(true, true);
695699
} catch (InterruptedException e) {
696700
processException(e);
697701
Thread.currentThread().interrupt();
@@ -701,7 +705,7 @@ void handleCommunicationIssue(Exception io) {
701705

702706
// Close socket is called when another connect attempt is possible
703707
// Close is called when the connection should shut down, period
704-
void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException {
708+
void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws InterruptedException {
705709
// Ensure we close the socket exclusively within one thread.
706710
closeSocketLock.lock();
707711
try {
@@ -720,7 +724,7 @@ void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException {
720724
statusLock.unlock();
721725
}
722726

723-
closeSocketImpl();
727+
closeSocketImpl(forceClose);
724728

725729
statusLock.lock();
726730
try {
@@ -749,10 +753,10 @@ void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException {
749753
*/
750754
@Override
751755
public void close() throws InterruptedException {
752-
this.close(true);
756+
this.close(true, false);
753757
}
754758

755-
void close(boolean checkDrainStatus) throws InterruptedException {
759+
void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedException {
756760
statusLock.lock();
757761
try {
758762
if (checkDrainStatus && this.isDraining()) {
@@ -779,7 +783,7 @@ void close(boolean checkDrainStatus) throws InterruptedException {
779783
this.reconnectWaiter.cancel(true);
780784
}
781785

782-
closeSocketImpl();
786+
closeSocketImpl(forceClose);
783787

784788
this.dispatchers.forEach((nuid, d) -> d.stop(false));
785789

@@ -831,7 +835,7 @@ void close(boolean checkDrainStatus) throws InterruptedException {
831835
}
832836

833837
// Should only be called from closeSocket or close
834-
void closeSocketImpl() {
838+
void closeSocketImpl(boolean forceClose) {
835839
this.currentServer = null;
836840

837841
// Signal both to stop.
@@ -854,8 +858,13 @@ void closeSocketImpl() {
854858

855859
// Close the current socket and cancel anyone waiting for it
856860
try {
857-
if (this.dataPort != null) {
858-
this.dataPort.close();
861+
if (dataPort != null) {
862+
if (forceClose) {
863+
dataPort.forceClose();
864+
}
865+
else {
866+
dataPort.close();
867+
}
859868
}
860869

861870
} catch (IOException ex) {
@@ -2121,7 +2130,7 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
21212130
try {
21222131
this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know
21232132
} catch (Exception e) {
2124-
this.close(false);
2133+
this.close(false, false);
21252134
throw e;
21262135
}
21272136

@@ -2163,13 +2172,13 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
21632172
}
21642173
}
21652174

2166-
this.close(false); // close the connection after the last flush
2175+
this.close(false, false); // close the connection after the last flush
21672176
tracker.complete(consumers.isEmpty());
21682177
} catch (TimeoutException | InterruptedException e) {
21692178
this.processException(e);
21702179
} finally {
21712180
try {
2172-
this.close(false);// close the connection after the last flush
2181+
this.close(false, false);// close the connection after the last flush
21732182
} catch (InterruptedException e) {
21742183
processException(e);
21752184
Thread.currentThread().interrupt();

src/main/java/io/nats/client/impl/SocketDataPort.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.OutputStream;
2626
import java.net.InetSocketAddress;
2727
import java.net.Socket;
28+
import java.net.SocketException;
2829
import java.net.URISyntaxException;
2930
import java.time.Duration;
3031
import java.util.concurrent.CompletableFuture;
@@ -166,15 +167,10 @@ public void close() throws IOException {
166167
@Override
167168
public void forceClose() throws IOException {
168169
try {
169-
// If we are here, and are being asked to force close,
170-
// there is no need to linger. The dev might have set
171-
// their own linger, in which case use theirs,
172-
// otherwise set it to 0 for the quickest close
173-
if (soLinger < 0) {
174-
socket.setSoLinger(true, 0);
175-
}
170+
// If we are being asked to force close, there is no need to linger.
171+
socket.setSoLinger(true, 0);
176172
}
177-
catch (IOException e) {
173+
catch (SocketException e) {
178174
// don't want to fail if I couldn't set linger
179175
}
180176
close();

0 commit comments

Comments
 (0)