Skip to content

Commit a83b369

Browse files
committed
fix: es java client record
1 parent 0b2db94 commit a83b369

14 files changed

+266
-343
lines changed

arex-instrumentation/httpclient/arex-httpclient-apache-v4/src/main/java/io/arex/inst/httpclient/apache/ApacheHttpClientModuleInstrumentation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.arex.inst.extension.ModuleInstrumentation;
55
import io.arex.inst.extension.TypeInstrumentation;
66

7-
import io.arex.inst.httpclient.apache.async.RequestProducerInstrumentation;
7+
import io.arex.inst.httpclient.apache.async.BasicFutureInstrumentation;
88
import io.arex.inst.httpclient.apache.async.InternalHttpAsyncClientInstrumentation;
99
import io.arex.inst.httpclient.apache.sync.InternalHttpClientInstrumentation;
1010
import java.util.List;
@@ -21,6 +21,6 @@ public ApacheHttpClientModuleInstrumentation() {
2121
public List<TypeInstrumentation> instrumentationTypes() {
2222
return asList(new InternalHttpClientInstrumentation(),
2323
new InternalHttpAsyncClientInstrumentation(),
24-
new RequestProducerInstrumentation());
24+
new BasicFutureInstrumentation());
2525
}
2626
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.arex.inst.httpclient.apache.async;
2+
3+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
4+
import static net.bytebuddy.matcher.ElementMatchers.named;
5+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
6+
7+
import io.arex.inst.extension.MethodInstrumentation;
8+
import io.arex.inst.extension.TypeInstrumentation;
9+
import io.arex.inst.runtime.context.ContextManager;
10+
import java.io.IOException;
11+
import java.util.Collections;
12+
import java.util.List;
13+
import net.bytebuddy.asm.Advice;
14+
import net.bytebuddy.description.type.TypeDescription;
15+
import net.bytebuddy.matcher.ElementMatcher;
16+
import org.apache.http.HttpResponse;
17+
import org.apache.http.concurrent.FutureCallback;
18+
import org.apache.http.entity.BufferedHttpEntity;
19+
import org.apache.http.util.EntityUtils;
20+
21+
/**
22+
* When using future.get() directly, caching the response inside a callback wrapper
23+
* may cause the client to read the response as empty
24+
* @since 2024/4/16
25+
*/
26+
public class BasicFutureInstrumentation extends TypeInstrumentation {
27+
28+
@Override
29+
protected ElementMatcher<TypeDescription> typeMatcher() {
30+
return named("org.apache.http.concurrent.BasicFuture");
31+
}
32+
33+
@Override
34+
public List<MethodInstrumentation> methodAdvices() {
35+
return Collections.singletonList(new MethodInstrumentation(isMethod()
36+
.and(named("completed"))
37+
.and(takesArguments(1)),
38+
this.getClass().getName() + "$FutureAdvice"));
39+
}
40+
41+
public static class FutureAdvice {
42+
@Advice.OnMethodEnter(suppress = Throwable.class)
43+
public static void completed(@Advice.Argument(0) Object response,
44+
@Advice.FieldValue(value = "callback") FutureCallback<?> callback) throws IOException {
45+
if (callback instanceof FutureCallbackWrapper) {
46+
HttpResponse httpResponse = (HttpResponse) response;
47+
if (httpResponse.getEntity() instanceof BufferedHttpEntity) {
48+
return;
49+
}
50+
51+
if (((FutureCallbackWrapper<?>) callback).isRecord()) {
52+
EntityUtils.updateEntity(httpResponse, new BufferedHttpEntity(httpResponse.getEntity()));
53+
}
54+
}
55+
}
56+
}
57+
}

arex-instrumentation/httpclient/arex-httpclient-apache-v4/src/main/java/io/arex/inst/httpclient/apache/async/FutureCallbackWrapper.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public class FutureCallbackWrapper<T> implements FutureCallback<T> {
1717
// Maybe null, Just to pass the trace
1818
private final HttpClientExtractor<HttpRequest, HttpResponse> extractor;
1919

20+
private boolean record;
21+
2022
public FutureCallbackWrapper(FutureCallback<T> delegate) {
2123
this(null, delegate);
2224
}
@@ -61,6 +63,14 @@ public void cancelled() {
6163
}
6264
}
6365

66+
public void setRecord(boolean record) {
67+
this.record = record;
68+
}
69+
70+
public boolean isRecord() {
71+
return record;
72+
}
73+
6474
public MockResult replay() {
6575
return extractor.replay();
6676
}

arex-instrumentation/httpclient/arex-httpclient-apache-v4/src/main/java/io/arex/inst/httpclient/apache/async/InternalHttpAsyncClientInstrumentation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020
import static java.util.Collections.singletonList;
2121
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
2222
import static net.bytebuddy.matcher.ElementMatchers.named;
23+
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
2324
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
2425
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
2526

2627
public class InternalHttpAsyncClientInstrumentation extends TypeInstrumentation {
2728

2829
@Override
2930
public ElementMatcher<TypeDescription> typeMatcher() {
30-
return named("org.apache.http.impl.nio.client.InternalHttpAsyncClient");
31+
return namedOneOf("org.apache.http.impl.nio.client.InternalHttpAsyncClient",
32+
"org.apache.http.impl.nio.client.MinimalHttpAsyncClient");
3133
}
3234

3335
@Override
@@ -59,6 +61,7 @@ public static boolean onEnter(@Advice.Argument(0) HttpAsyncRequestProducer produ
5961
if (callbackWrapper != null) {
6062
if (ContextManager.needRecord()) {
6163
// recording works in callback wrapper
64+
((FutureCallbackWrapper<?>)callbackWrapper).setRecord(true);
6265
callback = callbackWrapper;
6366
} else if (ContextManager.needReplay()) {
6467
mockResult = ((FutureCallbackWrapper<?>)callbackWrapper).replay();

arex-instrumentation/httpclient/arex-httpclient-apache-v4/src/main/java/io/arex/inst/httpclient/apache/async/RequestProducerInstrumentation.java

Lines changed: 0 additions & 46 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
package io.arex.inst.httpclient.apache.common;
22

33
import io.arex.agent.bootstrap.util.CollectionUtil;
4-
import io.arex.agent.bootstrap.util.IOUtils;
54
import io.arex.agent.bootstrap.util.StringUtil;
65
import io.arex.inst.httpclient.common.HttpClientAdapter;
76
import io.arex.inst.httpclient.common.HttpResponseWrapper;
87
import io.arex.inst.httpclient.common.HttpResponseWrapper.StringTuple;
98
import io.arex.inst.runtime.log.LogManager;
10-
import java.io.ByteArrayOutputStream;
9+
import java.io.IOException;
1110
import org.apache.http.Header;
1211
import org.apache.http.HttpEntity;
1312
import org.apache.http.HttpEntityEnclosingRequest;
1413
import org.apache.http.HttpRequest;
1514
import org.apache.http.HttpResponse;
1615
import org.apache.http.StatusLine;
17-
import org.apache.http.client.entity.GzipCompressingEntity;
1816
import org.apache.http.client.methods.HttpUriRequest;
1917
import org.apache.http.entity.BasicHttpEntity;
18+
import org.apache.http.entity.BufferedHttpEntity;
2019
import org.apache.http.entity.HttpEntityWrapper;
2120

2221
import java.io.ByteArrayInputStream;
@@ -31,7 +30,6 @@ public class ApacheHttpClientAdapter implements HttpClientAdapter<HttpRequest, H
3130

3231
public ApacheHttpClientAdapter(HttpRequest httpRequest) {
3332
this.httpRequest = (HttpUriRequest) httpRequest;
34-
wrapHttpEntity(httpRequest);
3533
}
3634

3735
@Override
@@ -41,27 +39,25 @@ public String getMethod() {
4139

4240
@Override
4341
public byte[] getRequestBytes() {
44-
HttpEntityEnclosingRequest enclosingRequest = enclosingRequest(httpRequest);
45-
if (enclosingRequest == null) {
42+
if (!(httpRequest instanceof HttpEntityEnclosingRequest)) {
4643
return ZERO_BYTE;
4744
}
48-
HttpEntity entity = enclosingRequest.getEntity();
49-
if (entity == null) {
45+
HttpEntityEnclosingRequest enclosingRequest = (HttpEntityEnclosingRequest) httpRequest;
46+
if (enclosingRequest.getEntity() == null) {
5047
return ZERO_BYTE;
5148
}
52-
// getContent will throw UnsupportedOperationException
53-
if (entity instanceof GzipCompressingEntity) {
54-
return writeTo((GzipCompressingEntity) entity);
55-
}
56-
if (entity instanceof CachedHttpEntityWrapper) {
57-
return ((CachedHttpEntityWrapper) entity).getCachedBody();
58-
}
59-
try {
60-
return IOUtils.copyToByteArray(entity.getContent());
61-
} catch (Exception e) {
62-
LogManager.warn("copyToByteArray", "getRequestBytes error, uri: " + getUri(), e);
63-
return ZERO_BYTE;
49+
50+
bufferRequestEntity(enclosingRequest);
51+
52+
if (enclosingRequest.getEntity() instanceof BufferedHttpEntity) {
53+
try {
54+
return EntityUtils.toByteArray(enclosingRequest.getEntity());
55+
} catch (IOException e) {
56+
LogManager.warn("AHC.wrap", "toByteArray error, uri: " + getUri(), e);
57+
return ZERO_BYTE;
58+
}
6459
}
60+
return ZERO_BYTE;
6561
}
6662

6763
@Override
@@ -83,13 +79,12 @@ public URI getUri() {
8379

8480
@Override
8581
public HttpResponseWrapper wrap(HttpResponse response) {
86-
87-
List<HttpResponseWrapper.StringTuple> headers = new ArrayList<>(response.getAllHeaders().length);
82+
List<StringTuple> headers = new ArrayList<>(response.getAllHeaders().length);
8883
for (Header header : response.getAllHeaders()) {
8984
if (StringUtil.isEmpty(header.getName())) {
9085
continue;
9186
}
92-
headers.add(new HttpResponseWrapper.StringTuple(header.getName(), header.getValue()));
87+
headers.add(new StringTuple(header.getName(), header.getValue()));
9388
}
9489

9590
HttpEntity httpEntity = response.getEntity();
@@ -99,28 +94,21 @@ public HttpResponseWrapper wrap(HttpResponse response) {
9994
return buildEmptyBodyResponseWrapper(response.getStatusLine().toString(), locale, headers);
10095
}
10196

102-
byte[] responseBody;
97+
// Compatible with org.apache.http.impl.client.InternalHttpClient.doExecute
98+
bufferResponseEntity(response);
99+
100+
byte[] responseBody = new byte[0];
103101
try {
104-
responseBody = IOUtils.copyToByteArray(httpEntity.getContent());
105-
// For release connection, see PoolingHttpClientConnectionManager#requestConnection,releaseConnection
106-
EntityUtils.consumeQuietly(httpEntity);
102+
if (response.getEntity() instanceof BufferedHttpEntity) {
103+
responseBody = EntityUtils.toByteArray(response.getEntity());
104+
}
107105
} catch (Exception e) {
108106
LogManager.warn("AHC.wrap", "AHC copyToByteArray error, uri: " + getUri(), e);
109107
return buildEmptyBodyResponseWrapper(response.getStatusLine().toString(), locale, headers);
110108
}
111109

112-
if (httpEntity instanceof BasicHttpEntity) {
113-
((BasicHttpEntity) httpEntity).setContent(new ByteArrayInputStream(responseBody));
114-
response.setEntity(httpEntity);
115-
} else if (httpEntity instanceof HttpEntityWrapper) {
116-
// Output response normally now, later need to check revert DecompressingEntity
117-
BasicHttpEntity entity = ApacheHttpClientHelper.createHttpEntity(response);
118-
entity.setContent(new ByteArrayInputStream(responseBody));
119-
response.setEntity(entity);
120-
}
121-
122110
return new HttpResponseWrapper(response.getStatusLine().toString(), responseBody,
123-
new HttpResponseWrapper.StringTuple(locale.getLanguage(), locale.getCountry()), headers);
111+
new StringTuple(locale.getLanguage(), locale.getCountry()), headers);
124112
}
125113

126114
@Override
@@ -154,7 +142,7 @@ private HttpResponseWrapper buildEmptyBodyResponseWrapper(String statusLine, Loc
154142
return null;
155143
}
156144
return new HttpResponseWrapper(statusLine, null,
157-
new HttpResponseWrapper.StringTuple(locale.getLanguage(), locale.getCountry()), headers);
145+
new StringTuple(locale.getLanguage(), locale.getCountry()), headers);
158146
}
159147

160148
private static boolean check(HttpEntity entity) {
@@ -169,37 +157,25 @@ private static boolean ignoreUserAgent(String userAgent) {
169157
return userAgent != null && userAgent.contains("arex");
170158
}
171159

172-
public static void wrapHttpEntity(HttpRequest httpRequest) {
173-
HttpEntityEnclosingRequest enclosingRequest = enclosingRequest(httpRequest);
174-
if (enclosingRequest == null) {
175-
return;
176-
}
177-
HttpEntity entity = enclosingRequest.getEntity();
178-
if (entity == null || entity.isRepeatable() || entity instanceof CachedHttpEntityWrapper) {
160+
private static void bufferRequestEntity(HttpEntityEnclosingRequest enclosingRequest) {
161+
if (enclosingRequest.getEntity() instanceof BufferedHttpEntity) {
179162
return;
180163
}
181164
try {
182-
enclosingRequest.setEntity(new CachedHttpEntityWrapper(entity));
165+
enclosingRequest.setEntity(new BufferedHttpEntity(enclosingRequest.getEntity()));
183166
} catch (Exception ignore) {
184167
// ignore exception
185168
}
186169
}
187170

188-
private static HttpEntityEnclosingRequest enclosingRequest(HttpRequest httpRequest) {
189-
if (httpRequest instanceof HttpEntityEnclosingRequest) {
190-
return (HttpEntityEnclosingRequest) httpRequest;
171+
public void bufferResponseEntity(HttpResponse response) {
172+
if (response.getEntity() instanceof BufferedHttpEntity) {
173+
return;
191174
}
192-
return null;
193-
}
194-
195-
private byte[] writeTo(GzipCompressingEntity entity) {
196-
ByteArrayOutputStream out = new ByteArrayOutputStream();
197175
try {
198-
entity.writeTo(out);
199-
return out.toByteArray();
176+
EntityUtils.updateEntity(response, new BufferedHttpEntity(response.getEntity()));
200177
} catch (Exception e) {
201-
LogManager.warn("writeTo", "getRequestBytes error, uri: " + getUri(), e);
202-
return ZERO_BYTE;
178+
// ignore exception
203179
}
204180
}
205181
}

0 commit comments

Comments
 (0)