Skip to content

8361249: PlainHttpConnection connection logic can be simplified #26087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ class PlainHttpConnection extends HttpConnection {
private final ReentrantLock stateLock = new ReentrantLock();
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();

// Indicates whether a connection attempt has succeeded or should be retried.
// If the attempt failed, and shouldn't be retried, there will be an exception
// Indicates whether a connection attempt has succeeded.
// If the attempt failed there will be an exception
// instead.
private enum ConnectState { SUCCESS, RETRY }
// CONNECTED is used when connect() succeeded immediately
// CONNECT_FINISHED is used when connect succeeded asynchronously
private enum ConnectState { CONNECTED, CONNECT_FINISHED }


/**
Expand Down Expand Up @@ -147,15 +149,9 @@ public void handle() {
assert finished || exchange.multi.requestCancelled() : "Expected channel to be connected";
if (connectionOpened()) {
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> ConnectState.SUCCESS, client().theExecutor());
cf.completeAsync(() -> ConnectState.CONNECT_FINISHED, client().theExecutor());
} else throw new ConnectException("Connection closed");
} catch (Throwable e) {
if (canRetryConnect(e)) {
unsuccessfulAttempts++;
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> ConnectState.RETRY, client().theExecutor());
return;
}
Throwable t = getError(Utils.toConnectException(e));
// complete async since the event runs on the SelectorManager thread
client().theExecutor().execute( () -> cf.completeExceptionally(t));
Expand Down Expand Up @@ -212,7 +208,7 @@ public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
if (finished) {
if (debug.on()) debug.log("connect finished without blocking");
if (connectionOpened()) {
cf.complete(ConnectState.SUCCESS);
cf.complete(ConnectState.CONNECTED);
} else throw getError(new ConnectException("connection closed"));
} else {
if (debug.on()) debug.log("registering connect event");
Expand All @@ -232,8 +228,10 @@ public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
debug.log("Failed to close channel after unsuccessful connect");
}
}
return cf.handle((r,t) -> checkRetryConnect(r, t, exchange))
.thenCompose(Function.identity());
return cf.thenApply((state)->{
if (debug.on()) debug.log("%s: %s", label(), state);
return null;
});
}

boolean connectionOpened() {
Expand All @@ -254,42 +252,6 @@ boolean connectionOpened() {
return !closed;
}

/**
* On some platforms, a ConnectEvent may be raised and a ConnectionException
* may occur with the message "Connection timed out: no further information"
* before our actual connection timeout has expired. In this case, this
* method will be called with a {@code connect} state of {@code ConnectState.RETRY)}
* and we will retry once again.
* @param connect indicates whether the connection was successful or should be retried
* @param failed the failure if the connection failed
* @param exchange the exchange
* @return a completable future that will take care of retrying the connection if needed.
*/
private CompletableFuture<Void> checkRetryConnect(ConnectState connect, Throwable failed, Exchange<?> exchange) {
// first check if the connection failed
if (failed != null) return MinimalFuture.failedFuture(failed);
// then check if the connection should be retried
if (connect == ConnectState.RETRY) {
int attempts = unsuccessfulAttempts;
assert attempts <= 1;
if (debug.on())
debug.log("Retrying connect after %d attempts", attempts);
return connectAsync(exchange);
}
// Otherwise, the connection was successful;
assert connect == ConnectState.SUCCESS;
return MinimalFuture.completedFuture(null);
}

private boolean canRetryConnect(Throwable e) {
if (!MultiExchange.RETRY_CONNECT) return false;
if (!(e instanceof ConnectException)) return false;
if (unsuccessfulAttempts > 0) return false;
ConnectTimerEvent timer = connectTimerEvent;
if (timer == null) return true;
return timer.deadline().isAfter(TimeSource.now());
}

@Override
public CompletableFuture<Void> finishConnect() {
assert connected == false;
Expand Down