Skip to content

WebClient read timeout per request #7135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.helidon.nima.testing.junit5.webserver;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -82,6 +83,11 @@ public String channelId() {
return "unit-client";
}

@Override
public void readTimeout(Duration readTimeout) {
//NOOP
}

private DataWriter writer(ArrayBlockingQueue<byte[]> queue) {
return new DataWriter() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import io.helidon.common.GenericType;
import io.helidon.common.http.Headers;
Expand All @@ -46,6 +50,7 @@
import static io.helidon.common.testing.http.junit5.HttpHeaderMatcher.hasHeader;
import static io.helidon.common.testing.http.junit5.HttpHeaderMatcher.noHeader;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNot.not;
Expand Down Expand Up @@ -77,6 +82,7 @@ static void routing(HttpRules rules) {
rules.get("/afterRedirect", ClientRequestImplTest::afterRedirectGet);
rules.put("/afterRedirect", ClientRequestImplTest::afterRedirectPut);
rules.put("/chunkresponse", ClientRequestImplTest::chunkResponseHandler);
rules.put("/delayedEndpoint", ClientRequestImplTest::delayedHandler);
}

@Test
Expand Down Expand Up @@ -324,6 +330,22 @@ void testRedirectKeepMethod() {
}
}

@Test
void testReadTimeoutPerRequest() {
String testEntity = "Test entity";
try (Http1ClientResponse response = injectedHttp1client.put("/delayedEndpoint")
.submit(testEntity)) {
assertThat(response.status(), is(Http.Status.OK_200));
assertThat(response.as(String.class), is(testEntity));
}

UncheckedIOException ste = assertThrows(UncheckedIOException.class,
() -> injectedHttp1client.put("/delayedEndpoint")
.readTimeout(Duration.ofMillis(1))
.submit(testEntity));
assertThat(ste.getCause(), instanceOf(SocketTimeoutException.class));
}

private static void validateSuccessfulResponse(Http1Client client) {
String requestEntity = "Sending Something";
Http1ClientRequest request = client.put("/test");
Expand Down Expand Up @@ -388,6 +410,11 @@ private static void afterRedirectPut(ServerRequest req, ServerResponse res) {
.send();
}

private static void delayedHandler(ServerRequest req, ServerResponse res) throws IOException, InterruptedException {
TimeUnit.MILLISECONDS.sleep(10);
customHandler(req, res, false);
}

private static void responseHandler(ServerRequest req, ServerResponse res) throws IOException {
customHandler(req, res, false);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package io.helidon.nima.webclient;

import java.time.Duration;

import io.helidon.common.buffers.DataReader;
import io.helidon.common.buffers.DataWriter;

Expand Down Expand Up @@ -54,4 +56,11 @@ public interface ClientConnection {
* @return id of this channel (connection)
*/
String channelId();

/**
* Read timeout for this connection.
*
* @param readTimeout connection read timeout
*/
void readTimeout(Duration readTimeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.helidon.nima.webclient.http1;

import java.net.URI;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
Expand Down Expand Up @@ -61,6 +62,7 @@ class ClientRequestImpl implements Http1ClientRequest {
private WritableHeaders<?> explicitHeaders = WritableHeaders.create();
private boolean followRedirects;
private int maxRedirects;
private Duration readTimeout;
private Tls tls;
private String uriTemplate;
private ClientConnection connection;
Expand All @@ -76,6 +78,7 @@ class ClientRequestImpl implements Http1ClientRequest {
this.method = method;
this.uri = helper;
this.properties = new HashMap<>(properties);
this.readTimeout = clientConfig.socketOptions().readTimeout();

this.clientConfig = clientConfig;
this.mediaContext = clientConfig.mediaContext();
Expand Down Expand Up @@ -253,6 +256,17 @@ public Http1ClientRequest keepAlive(boolean keepAlive) {
return this;
}

/**
* Read timeout for this request.
*
* @param readTimeout response read timeout
* @return updated client request
*/
public Http1ClientRequest readTimeout(Duration readTimeout) {
this.readTimeout = readTimeout;
return this;
}

Http1ClientConfig clientConfig() {
return clientConfig;
}
Expand Down Expand Up @@ -345,6 +359,10 @@ public UriQuery uriQuery() {
return UriQuery.create(resolvedUri());
}

Duration readTimeout() {
return readTimeout;
}

private ClientResponseImpl invokeServices(WebClientService.Chain callChain,
CompletableFuture<WebClientServiceRequest> whenSent,
CompletableFuture<WebClientServiceResponse> whenComplete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.HexFormat;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -101,6 +103,18 @@ public String channelId() {
return channelId;
}

@Override
public void readTimeout(Duration readTimeout) {
if (!isConnected()) {
throw new IllegalStateException("Read timeout cannot be set, because connection has not been established.");
}
try {
socket.setSoTimeout((int) readTimeout.toMillis());
} catch (SocketException e) {
throw new UncheckedIOException("Could not set read timeout to the connection with the channel id: " + channelId, e);
}
}

boolean isConnected() {
return socket != null && socket.isConnected();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.helidon.nima.webclient.http1;

import java.time.Duration;

import io.helidon.common.http.Http;
import io.helidon.common.uri.UriPath;
import io.helidon.common.uri.UriQuery;
Expand All @@ -26,6 +28,14 @@
*/
public interface Http1ClientRequest extends ClientRequest<Http1ClientRequest, Http1ClientResponse> {

/**
* Read timeout for this request.
*
* @param readTimeout response read timeout
* @return updated client request
*/
Http1ClientRequest readTimeout(Duration readTimeout);

/**
* HTTP Method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

class HttpCallEntityChain extends HttpCallChainBase {

private final ClientRequestImpl request;
private final Http1ClientConfig clientConfig;
private final CompletableFuture<WebClientServiceRequest> whenSent;
private final CompletableFuture<WebClientServiceResponse> whenComplete;
Expand All @@ -47,6 +48,7 @@ class HttpCallEntityChain extends HttpCallChainBase {
CompletableFuture<WebClientServiceResponse> whenComplete,
Object entity) {
super(clientConfig, connection, tls, request.keepAlive());
this.request = request;
this.clientConfig = clientConfig;
this.whenSent = whenSent;
this.whenComplete = whenComplete;
Expand All @@ -66,6 +68,7 @@ public WebClientServiceResponse doProceed(ClientConnection connection,
} else {
entityBytes = entityBytes(entity, headers);
}
connection.readTimeout(request.readTimeout());

headers.set(Http.Header.create(Http.Header.CONTENT_LENGTH, entityBytes.length));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -49,7 +50,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import static io.helidon.common.testing.http.junit5.HttpHeaderMatcher.hasHeader;
import static io.helidon.common.testing.http.junit5.HttpHeaderMatcher.noHeader;
Expand Down Expand Up @@ -523,6 +523,11 @@ public String channelId() {
return null;
}

@Override
public void readTimeout(Duration readTimeout) {
//NOOP
}

// This will be used for testing the element of Prologue
String getPrologue() {
return prologue;
Expand Down