Skip to content

Commit 6896d4e

Browse files
authored
fix: es java client record (#446)
1 parent 0b2db94 commit 6896d4e

15 files changed

+233
-267
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ AREX utilizes the advanced Java technique, Instrument API, and is capable of ins
4343
- Spring WebClient [5.0,)
4444
- Spring Template
4545
- Feign [9.0,)
46+
- Elasticsearch Client [7.x,)
4647
#### Redis Client
48+
- RedisTemplate
4749
- Jedis [2.10+, 4+]
4850
- Redisson [3.0,)
4951
- Lettuce [5.x, 6.x]

arex-instrumentation/httpclient/arex-httpclient-apache-v4/src/main/java/io/arex/inst/httpclient/apache/ApacheHttpClientModuleInstrumentation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import io.arex.inst.extension.ModuleInstrumentation;
55
import io.arex.inst.extension.TypeInstrumentation;
66

7-
import io.arex.inst.httpclient.apache.async.RequestProducerInstrumentation;
7+
import io.arex.inst.httpclient.apache.async.BasicFutureInstrumentation;
88
import io.arex.inst.httpclient.apache.async.InternalHttpAsyncClientInstrumentation;
9+
import io.arex.inst.httpclient.apache.async.RequestProducerInstrumentation;
910
import io.arex.inst.httpclient.apache.sync.InternalHttpClientInstrumentation;
1011
import java.util.List;
1112

@@ -21,6 +22,7 @@ public ApacheHttpClientModuleInstrumentation() {
2122
public List<TypeInstrumentation> instrumentationTypes() {
2223
return asList(new InternalHttpClientInstrumentation(),
2324
new InternalHttpAsyncClientInstrumentation(),
24-
new RequestProducerInstrumentation());
25+
new RequestProducerInstrumentation(),
26+
new BasicFutureInstrumentation());
2527
}
2628
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.arex.inst.httpclient.apache.async;
2+
3+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
4+
import static net.bytebuddy.matcher.ElementMatchers.named;
5+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
6+
7+
import io.arex.inst.extension.MethodInstrumentation;
8+
import io.arex.inst.extension.TypeInstrumentation;
9+
import io.arex.inst.httpclient.apache.common.ApacheHttpClientAdapter;
10+
import java.util.Collections;
11+
import java.util.List;
12+
import net.bytebuddy.asm.Advice;
13+
import net.bytebuddy.description.type.TypeDescription;
14+
import net.bytebuddy.matcher.ElementMatcher;
15+
import org.apache.http.HttpResponse;
16+
import org.apache.http.concurrent.FutureCallback;
17+
18+
/**
19+
* When using future.get() directly, caching the response inside a callback wrapper
20+
* may cause the client to read the response as empty
21+
* @since 2024/4/16
22+
*/
23+
public class BasicFutureInstrumentation extends TypeInstrumentation {
24+
25+
@Override
26+
protected ElementMatcher<TypeDescription> typeMatcher() {
27+
return named("org.apache.http.concurrent.BasicFuture");
28+
}
29+
30+
@Override
31+
public List<MethodInstrumentation> methodAdvices() {
32+
return Collections.singletonList(new MethodInstrumentation(isMethod()
33+
.and(named("completed"))
34+
.and(takesArguments(1)), FutureAdvice.class.getName()));
35+
}
36+
37+
public static class FutureAdvice {
38+
@Advice.OnMethodEnter(suppress = Throwable.class)
39+
public static void completed(@Advice.Argument(0) Object response,
40+
@Advice.FieldValue(value = "callback") FutureCallback<?> callback) {
41+
if (callback instanceof FutureCallbackWrapper) {
42+
if (((FutureCallbackWrapper<?>) callback).isNeedRecord()) {
43+
ApacheHttpClientAdapter.bufferResponseEntity((HttpResponse) response);
44+
}
45+
}
46+
}
47+
}
48+
}

arex-instrumentation/httpclient/arex-httpclient-apache-v4/src/main/java/io/arex/inst/httpclient/apache/async/FutureCallbackWrapper.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public class FutureCallbackWrapper<T> implements FutureCallback<T> {
1717
// Maybe null, Just to pass the trace
1818
private final HttpClientExtractor<HttpRequest, HttpResponse> extractor;
1919

20+
private boolean needRecord;
21+
2022
public FutureCallbackWrapper(FutureCallback<T> delegate) {
2123
this(null, delegate);
2224
}
@@ -61,6 +63,14 @@ public void cancelled() {
6163
}
6264
}
6365

66+
public void setNeedRecord(boolean needRecord) {
67+
this.needRecord = needRecord;
68+
}
69+
70+
public boolean isNeedRecord() {
71+
return needRecord;
72+
}
73+
6474
public MockResult replay() {
6575
return extractor.replay();
6676
}

arex-instrumentation/httpclient/arex-httpclient-apache-v4/src/main/java/io/arex/inst/httpclient/apache/async/InternalHttpAsyncClientInstrumentation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020
import static java.util.Collections.singletonList;
2121
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
2222
import static net.bytebuddy.matcher.ElementMatchers.named;
23+
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
2324
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
2425
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
2526

2627
public class InternalHttpAsyncClientInstrumentation extends TypeInstrumentation {
2728

2829
@Override
2930
public ElementMatcher<TypeDescription> typeMatcher() {
30-
return named("org.apache.http.impl.nio.client.InternalHttpAsyncClient");
31+
return namedOneOf("org.apache.http.impl.nio.client.InternalHttpAsyncClient",
32+
"org.apache.http.impl.nio.client.MinimalHttpAsyncClient");
3133
}
3234

3335
@Override
@@ -59,6 +61,7 @@ public static boolean onEnter(@Advice.Argument(0) HttpAsyncRequestProducer produ
5961
if (callbackWrapper != null) {
6062
if (ContextManager.needRecord()) {
6163
// recording works in callback wrapper
64+
((FutureCallbackWrapper<?>)callbackWrapper).setNeedRecord(true);
6265
callback = callbackWrapper;
6366
} else if (ContextManager.needReplay()) {
6467
mockResult = ((FutureCallbackWrapper<?>)callbackWrapper).replay();

arex-instrumentation/httpclient/arex-httpclient-apache-v4/src/main/java/io/arex/inst/httpclient/apache/async/RequestProducerInstrumentation.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414
import net.bytebuddy.asm.Advice;
1515
import net.bytebuddy.description.type.TypeDescription;
1616
import net.bytebuddy.matcher.ElementMatcher;
17+
import org.apache.http.HttpEntityEnclosingRequest;
1718
import org.apache.http.HttpRequest;
1819

1920
/**
20-
* Wrapped the http request for reading request body repeatedly.
21+
* When entity is an InputStreamEntity, we need to buffer it in advance in order to repeat the reads.
2122
* @since 2024/2/21
2223
*/
2324
public class RequestProducerInstrumentation extends TypeInstrumentation {
@@ -36,10 +37,12 @@ public List<MethodInstrumentation> methodAdvices() {
3637
}
3738

3839
static class ConstructorAdvice {
39-
@Advice.OnMethodEnter
40+
@Advice.OnMethodEnter(suppress = Throwable.class)
4041
public static void onEnter(@Advice.Argument(1) HttpRequest request) {
41-
if (ContextManager.needRecordOrReplay()) {
42-
ApacheHttpClientAdapter.wrapHttpEntity(request);
42+
if (request instanceof HttpEntityEnclosingRequest) {
43+
if (ContextManager.needRecordOrReplay()) {
44+
ApacheHttpClientAdapter.bufferRequestEntity((HttpEntityEnclosingRequest) request);
45+
}
4346
}
4447
}
4548
}

arex-instrumentation/httpclient/arex-httpclient-apache-v4/src/main/java/io/arex/inst/httpclient/apache/common/ApacheHttpClientAdapter.java

Lines changed: 42 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
package io.arex.inst.httpclient.apache.common;
22

3-
import io.arex.agent.bootstrap.util.CollectionUtil;
4-
import io.arex.agent.bootstrap.util.IOUtils;
3+
import io.arex.agent.bootstrap.util.ArrayUtils;
54
import io.arex.agent.bootstrap.util.StringUtil;
65
import io.arex.inst.httpclient.common.HttpClientAdapter;
76
import io.arex.inst.httpclient.common.HttpResponseWrapper;
87
import io.arex.inst.httpclient.common.HttpResponseWrapper.StringTuple;
98
import io.arex.inst.runtime.log.LogManager;
10-
import java.io.ByteArrayOutputStream;
9+
import java.io.IOException;
1110
import org.apache.http.Header;
1211
import org.apache.http.HttpEntity;
1312
import org.apache.http.HttpEntityEnclosingRequest;
1413
import org.apache.http.HttpRequest;
1514
import org.apache.http.HttpResponse;
1615
import org.apache.http.StatusLine;
17-
import org.apache.http.client.entity.GzipCompressingEntity;
1816
import org.apache.http.client.methods.HttpUriRequest;
1917
import org.apache.http.entity.BasicHttpEntity;
18+
import org.apache.http.entity.BufferedHttpEntity;
2019
import org.apache.http.entity.HttpEntityWrapper;
2120

2221
import java.io.ByteArrayInputStream;
@@ -31,7 +30,6 @@ public class ApacheHttpClientAdapter implements HttpClientAdapter<HttpRequest, H
3130

3231
public ApacheHttpClientAdapter(HttpRequest httpRequest) {
3332
this.httpRequest = (HttpUriRequest) httpRequest;
34-
wrapHttpEntity(httpRequest);
3533
}
3634

3735
@Override
@@ -41,27 +39,17 @@ public String getMethod() {
4139

4240
@Override
4341
public byte[] getRequestBytes() {
44-
HttpEntityEnclosingRequest enclosingRequest = enclosingRequest(httpRequest);
45-
if (enclosingRequest == null) {
42+
if (!(httpRequest instanceof HttpEntityEnclosingRequest)) {
4643
return ZERO_BYTE;
4744
}
48-
HttpEntity entity = enclosingRequest.getEntity();
49-
if (entity == null) {
50-
return ZERO_BYTE;
51-
}
52-
// getContent will throw UnsupportedOperationException
53-
if (entity instanceof GzipCompressingEntity) {
54-
return writeTo((GzipCompressingEntity) entity);
55-
}
56-
if (entity instanceof CachedHttpEntityWrapper) {
57-
return ((CachedHttpEntityWrapper) entity).getCachedBody();
58-
}
59-
try {
60-
return IOUtils.copyToByteArray(entity.getContent());
61-
} catch (Exception e) {
62-
LogManager.warn("copyToByteArray", "getRequestBytes error, uri: " + getUri(), e);
45+
HttpEntityEnclosingRequest enclosingRequest = (HttpEntityEnclosingRequest) httpRequest;
46+
if (enclosingRequest.getEntity() == null) {
6347
return ZERO_BYTE;
6448
}
49+
50+
bufferRequestEntity(enclosingRequest);
51+
52+
return getEntityBytes(enclosingRequest.getEntity());
6553
}
6654

6755
@Override
@@ -83,44 +71,32 @@ public URI getUri() {
8371

8472
@Override
8573
public HttpResponseWrapper wrap(HttpResponse response) {
74+
Header[] responseHeaders = response.getAllHeaders();
75+
if (ArrayUtils.isEmpty(responseHeaders)) {
76+
return null;
77+
}
8678

87-
List<HttpResponseWrapper.StringTuple> headers = new ArrayList<>(response.getAllHeaders().length);
88-
for (Header header : response.getAllHeaders()) {
79+
List<StringTuple> headers = new ArrayList<>(responseHeaders.length);
80+
for (Header header : responseHeaders) {
8981
if (StringUtil.isEmpty(header.getName())) {
9082
continue;
9183
}
92-
headers.add(new HttpResponseWrapper.StringTuple(header.getName(), header.getValue()));
84+
headers.add(new StringTuple(header.getName(), header.getValue()));
9385
}
9486

95-
HttpEntity httpEntity = response.getEntity();
9687
Locale locale = response.getLocale();
9788

98-
if (!check(httpEntity)) {
99-
return buildEmptyBodyResponseWrapper(response.getStatusLine().toString(), locale, headers);
89+
if (!check(response.getEntity())) {
90+
return new HttpResponseWrapper(response.getStatusLine().toString(), null,
91+
new StringTuple(locale.getLanguage(), locale.getCountry()), headers);
10092
}
10193

102-
byte[] responseBody;
103-
try {
104-
responseBody = IOUtils.copyToByteArray(httpEntity.getContent());
105-
// For release connection, see PoolingHttpClientConnectionManager#requestConnection,releaseConnection
106-
EntityUtils.consumeQuietly(httpEntity);
107-
} catch (Exception e) {
108-
LogManager.warn("AHC.wrap", "AHC copyToByteArray error, uri: " + getUri(), e);
109-
return buildEmptyBodyResponseWrapper(response.getStatusLine().toString(), locale, headers);
110-
}
111-
112-
if (httpEntity instanceof BasicHttpEntity) {
113-
((BasicHttpEntity) httpEntity).setContent(new ByteArrayInputStream(responseBody));
114-
response.setEntity(httpEntity);
115-
} else if (httpEntity instanceof HttpEntityWrapper) {
116-
// Output response normally now, later need to check revert DecompressingEntity
117-
BasicHttpEntity entity = ApacheHttpClientHelper.createHttpEntity(response);
118-
entity.setContent(new ByteArrayInputStream(responseBody));
119-
response.setEntity(entity);
120-
}
94+
// Compatible with org.apache.http.impl.client.InternalHttpClient.doExecute
95+
bufferResponseEntity(response);
12196

97+
byte[] responseBody = getEntityBytes(response.getEntity());
12298
return new HttpResponseWrapper(response.getStatusLine().toString(), responseBody,
123-
new HttpResponseWrapper.StringTuple(locale.getLanguage(), locale.getCountry()), headers);
99+
new StringTuple(locale.getLanguage(), locale.getCountry()), headers);
124100
}
125101

126102
@Override
@@ -147,16 +123,6 @@ private static void appendHeaders(HttpResponse response, List<StringTuple> heade
147123
}
148124
}
149125

150-
private HttpResponseWrapper buildEmptyBodyResponseWrapper(String statusLine, Locale locale,
151-
List<StringTuple> headers) {
152-
if (CollectionUtil.isEmpty(headers)) {
153-
LogManager.warn("AHC.wrap", "AHC response wrap failed, uri: " + getUri());
154-
return null;
155-
}
156-
return new HttpResponseWrapper(statusLine, null,
157-
new HttpResponseWrapper.StringTuple(locale.getLanguage(), locale.getCountry()), headers);
158-
}
159-
160126
private static boolean check(HttpEntity entity) {
161127
return entity instanceof BasicHttpEntity || entity instanceof HttpEntityWrapper;
162128
}
@@ -169,36 +135,36 @@ private static boolean ignoreUserAgent(String userAgent) {
169135
return userAgent != null && userAgent.contains("arex");
170136
}
171137

172-
public static void wrapHttpEntity(HttpRequest httpRequest) {
173-
HttpEntityEnclosingRequest enclosingRequest = enclosingRequest(httpRequest);
174-
if (enclosingRequest == null) {
175-
return;
176-
}
177-
HttpEntity entity = enclosingRequest.getEntity();
178-
if (entity == null || entity.isRepeatable() || entity instanceof CachedHttpEntityWrapper) {
138+
public static void bufferRequestEntity(HttpEntityEnclosingRequest enclosingRequest) {
139+
if (enclosingRequest.getEntity() == null || enclosingRequest.getEntity() instanceof BufferedHttpEntity) {
179140
return;
180141
}
181142
try {
182-
enclosingRequest.setEntity(new CachedHttpEntityWrapper(entity));
143+
enclosingRequest.setEntity(new BufferedHttpEntity(enclosingRequest.getEntity()));
183144
} catch (Exception ignore) {
184145
// ignore exception
185146
}
186147
}
187148

188-
private static HttpEntityEnclosingRequest enclosingRequest(HttpRequest httpRequest) {
189-
if (httpRequest instanceof HttpEntityEnclosingRequest) {
190-
return (HttpEntityEnclosingRequest) httpRequest;
149+
public static void bufferResponseEntity(HttpResponse response) {
150+
if (response.getEntity() == null || response.getEntity() instanceof BufferedHttpEntity) {
151+
return;
152+
}
153+
try {
154+
EntityUtils.updateEntity(response, new BufferedHttpEntity(response.getEntity()));
155+
} catch (Exception e) {
156+
// ignore exception
191157
}
192-
return null;
193158
}
194159

195-
private byte[] writeTo(GzipCompressingEntity entity) {
196-
ByteArrayOutputStream out = new ByteArrayOutputStream();
160+
private byte[] getEntityBytes(HttpEntity entity) {
161+
if (!(entity instanceof BufferedHttpEntity)) {
162+
return ZERO_BYTE;
163+
}
197164
try {
198-
entity.writeTo(out);
199-
return out.toByteArray();
200-
} catch (Exception e) {
201-
LogManager.warn("writeTo", "getRequestBytes error, uri: " + getUri(), e);
165+
return EntityUtils.toByteArray(entity);
166+
} catch (IOException e) {
167+
LogManager.warn("AHC.getEntityBytes", "getEntityBytes error, uri: " + getUri(), e);
202168
return ZERO_BYTE;
203169
}
204170
}

0 commit comments

Comments
 (0)