Skip to content

Commit db18e20

Browse files
committed
release 9.2.1
Signed-off-by: neo <[email protected]>
1 parent 76f7ea9 commit db18e20

16 files changed

+163
-186
lines changed

CHANGELOG.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
## Change log
22

3-
### 9.2.0 (4/24/2025 - 5/19/2025)
3+
### 9.2.1 (4/24/2025 - 5/19/2025) !!! only support java 24
44

55
* java: update to 24
66
> refer to https://openjdk.org/jeps/491
7-
* search: update to ES 9.0.0
8-
> migrated to apache httpclient 5
7+
* search: update to ES 8.17.1
98
> simplify logging
109
> remove opentelemetry dependency
10+
> !!! es java client 9.x doesn't support with es server 8.x, es java client is forward compatible
11+
> refer to https://www.elastic.co/docs/reference/elasticsearch/clients/java
1112
* action: added ActionLogContext.maxProcessTime(duration)
1213
> for long and non-critical action (such as test jobs), set max process time to avoid SLOW_PROCESS warning
1314

TODO.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@
2121
* migrate to dragonflydb and support RESP3 (cluster / MOVED handling) ?
2222
* migrate to opensearch ?
2323
* log exporter, reimplement in rust?
24+
25+
* update ES to 9.x

build.gradle.kts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ apply(plugin = "project")
77

88
subprojects {
99
group = "core.framework"
10-
version = "9.2.0"
10+
version = "9.2.1"
1111
repositories {
1212
maven {
1313
url = uri("https://neowu.github.io/maven-repo/")
@@ -18,8 +18,8 @@ subprojects {
1818
}
1919
}
2020

21-
val elasticVersion = "9.0.0"
22-
val jacksonVersion = "2.18.3"
21+
val elasticVersion = "8.18.1"
22+
val jacksonVersion = "2.18.4"
2323
val junitVersion = "5.12.2"
2424
val mockitoVersion = "5.17.0"
2525
val assertjVersion = "3.27.3"
@@ -90,7 +90,6 @@ project("core-ng-search") {
9090
api("co.elastic.clients:elasticsearch-java:${elasticVersion}") {
9191
exclude(group = "io.opentelemetry")
9292
}
93-
implementation("commons-logging:commons-logging:1.3.5")
9493
implementation("com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}")
9594
testImplementation(project(":core-ng-test"))
9695
}
@@ -109,7 +108,6 @@ project("core-ng-search-test") {
109108
implementation("core.framework.elasticsearch.module:lang-painless:${elasticVersion}")
110109
implementation("core.framework.elasticsearch.module:analysis-common:${elasticVersion}") // used by elasticsearch stemmer
111110
implementation("core.framework.elasticsearch.module:reindex:${elasticVersion}") // used by elasticsearch deleteByQuery
112-
implementation("org.apache.httpcomponents:httpasyncclient:4.1.5") // used by reindex
113111
runtimeOnly("org.apache.logging.log4j:log4j-to-slf4j:2.19.0")
114112
}
115113
}

core-ng-search-test/src/main/java/core/framework/search/impl/LocalElasticSearch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import core.framework.util.Files;
44
import core.framework.util.StopWatch;
5-
import org.apache.hc.core5.http.HttpHost;
5+
import org.apache.http.HttpHost;
66
import org.elasticsearch.cluster.ClusterName;
77
import org.elasticsearch.common.network.NetworkService;
88
import org.elasticsearch.common.settings.Settings;

core-ng-search-test/src/main/java/core/framework/search/module/TestSearchConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import core.framework.internal.module.ModuleContext;
44
import core.framework.internal.module.ShutdownHook;
55
import core.framework.search.impl.LocalElasticSearch;
6-
import org.apache.hc.core5.http.HttpHost;
6+
import org.apache.http.HttpHost;
77
import org.elasticsearch.common.logging.LogConfigurator;
88

99
import java.util.concurrent.locks.ReentrantLock;

core-ng-search-test/src/test/java/core/framework/search/impl/ElasticSearchAggregationIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ void aggregate() {
6262
assertThat(response.hits).hasSize(1);
6363
assertThat(response.aggregations).containsKeys("total_value");
6464

65-
int sum = response.aggregations.get("total_value").sum().value().intValue();
65+
int sum = (int) response.aggregations.get("total_value").sum().value();
6666
assertThat(sum).isEqualTo(21);
6767
}
6868

core-ng-search-test/src/test/java/core/framework/search/impl/ElasticSearchIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ void search() {
124124
.filter(term("enum_field", JSON.toEnumValue(TestDocument.TestEnum.VALUE1))));
125125

126126
request.sorts.add(SortOptions.of(builder -> builder.script(s ->
127-
s.script(script -> script.source(source -> source.scriptString("doc['int_field'].value * 3"))).type(ScriptSortType.Number))));
127+
s.script(script -> script.source("doc['int_field'].value * 3")).type(ScriptSortType.Number))));
128128

129129
SearchResponse<TestDocument> response = documentType.search(request);
130130

core-ng-search/src/main/java/core/framework/search/ElasticSearchMigration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import core.framework.internal.module.PropertyManager;
44
import core.framework.search.impl.ElasticSearchHost;
55
import core.framework.search.impl.ElasticSearchImpl;
6-
import org.apache.hc.core5.http.HttpHost;
6+
import org.apache.http.HttpHost;
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99

core-ng-search/src/main/java/core/framework/search/impl/ElasticSearchHost.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package core.framework.search.impl;
22

33
import core.framework.util.Strings;
4-
import org.apache.hc.core5.http.HttpHost;
4+
import org.apache.http.HttpHost;
55

66
/**
77
* @author neo
@@ -34,6 +34,6 @@ private static HttpHost host(String value) {
3434
port = Integer.parseInt(value.substring(portIndex + 1));
3535
hostEnd = portIndex;
3636
}
37-
return new HttpHost(schema, value.substring(hostStart, hostEnd), port);
37+
return new HttpHost(value.substring(hostStart, hostEnd), port, schema);
3838
}
3939
}

core-ng-search/src/main/java/core/framework/search/impl/ElasticSearchImpl.java

Lines changed: 23 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,8 @@
66
import co.elastic.clients.elasticsearch.indices.ElasticsearchIndicesClient;
77
import co.elastic.clients.json.JsonData;
88
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
9-
import co.elastic.clients.transport.rest5_client.Rest5ClientTransport;
10-
import co.elastic.clients.transport.rest5_client.low_level.Request;
11-
import co.elastic.clients.transport.rest5_client.low_level.Response;
12-
import co.elastic.clients.transport.rest5_client.low_level.Rest5Client;
13-
import co.elastic.clients.transport.rest5_client.low_level.Rest5ClientBuilder;
9+
import co.elastic.clients.transport.instrumentation.NoopInstrumentation;
10+
import co.elastic.clients.transport.rest_client.RestClientTransport;
1411
import com.fasterxml.jackson.annotation.JsonInclude;
1512
import com.fasterxml.jackson.databind.ObjectMapper;
1613
import core.framework.internal.json.JSONMapper;
@@ -21,26 +18,22 @@
2118
import core.framework.search.SearchException;
2219
import core.framework.util.Encodings;
2320
import core.framework.util.StopWatch;
24-
import org.apache.hc.client5.http.config.ConnectionConfig;
25-
import org.apache.hc.client5.http.config.RequestConfig;
26-
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
27-
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
28-
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
29-
import org.apache.hc.core5.http.Header;
30-
import org.apache.hc.core5.http.HttpEntity;
31-
import org.apache.hc.core5.http.HttpHost;
32-
import org.apache.hc.core5.http.io.entity.EntityUtils;
33-
import org.apache.hc.core5.http.message.BasicHeader;
34-
import org.apache.hc.core5.util.TimeValue;
35-
import org.apache.hc.core5.util.Timeout;
21+
import org.apache.http.Header;
22+
import org.apache.http.HttpEntity;
23+
import org.apache.http.HttpHost;
24+
import org.apache.http.message.BasicHeader;
25+
import org.apache.http.util.EntityUtils;
26+
import org.elasticsearch.client.Request;
27+
import org.elasticsearch.client.Response;
28+
import org.elasticsearch.client.RestClient;
29+
import org.elasticsearch.client.RestClientBuilder;
3630
import org.slf4j.Logger;
3731
import org.slf4j.LoggerFactory;
3832

3933
import java.io.IOException;
4034
import java.io.UncheckedIOException;
4135
import java.time.Duration;
4236
import java.util.Map;
43-
import java.util.concurrent.TimeUnit;
4437

4538
/**
4639
* @author neo
@@ -53,47 +46,26 @@ public class ElasticSearchImpl implements ElasticSearch {
5346
public int maxResultWindow = 10000;
5447
ElasticsearchClient client;
5548
Header authHeader;
56-
private Rest5Client restClient;
49+
private RestClient restClient;
5750
private ObjectMapper mapper;
5851

5952
// initialize will be called in startup hook, no need to synchronize
6053
public void initialize() {
61-
if (client == null) {
62-
// initialize can be called by initSearch explicitly during test,
63-
Rest5ClientBuilder builder = Rest5Client.builder(hosts);
54+
if (client == null) { // initialize can be called by initSearch explicitly during test,
55+
RestClientBuilder builder = RestClient.builder(hosts);
6456
if (authHeader != null) {
6557
builder.setDefaultHeaders(new Header[]{authHeader});
6658
}
67-
68-
RequestConfig requestConfig = RequestConfig.custom()
69-
.setConnectionRequestTimeout(Timeout.ofSeconds(5))
70-
.setResponseTimeout((int) timeout.toMillis(), TimeUnit.MILLISECONDS)
71-
.build();
72-
73-
ConnectionConfig connectionConfig = ConnectionConfig.custom()
74-
.setConnectTimeout(Timeout.ofSeconds(5))
75-
.setSocketTimeout((int) timeout.toMillis(), TimeUnit.MILLISECONDS)
76-
.setTimeToLive(TimeValue.ofSeconds(30))
77-
.build();
78-
79-
PoolingAsyncClientConnectionManager connectionManager =
80-
PoolingAsyncClientConnectionManagerBuilder.create()
81-
.setDefaultConnectionConfig(connectionConfig)
82-
.setMaxConnPerRoute(100)
83-
.setMaxConnTotal(100)
84-
.build();
85-
86-
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create()
87-
.setDefaultRequestConfig(requestConfig)
88-
.setConnectionManager(connectionManager)
89-
.setUserAgent("elasticsearch-java/core-ng")
90-
.setThreadFactory(Thread.ofPlatform().name("elasticsearch-rest-client-", 0).factory());
91-
92-
builder.setHttpClient(httpClientBuilder.build());
93-
59+
builder.setRequestConfigCallback(config -> config.setConnectionRequestTimeout(5_000) // timeout of requesting connection from connection pool
60+
.setConnectTimeout(5_000) // 5s, usually es is within same network, use shorter timeout to fail fast
61+
.setSocketTimeout((int) timeout.toMillis()));
62+
builder.setHttpClientConfigCallback(config -> config.setMaxConnTotal(100)
63+
.setMaxConnPerRoute(100)
64+
.setKeepAliveStrategy((response, context) -> Duration.ofSeconds(30).toMillis())
65+
.addInterceptorFirst(new ElasticSearchLogInterceptor()));
9466
restClient = builder.build();
9567
mapper = JSONMapper.builder().serializationInclusion(JsonInclude.Include.NON_NULL).build();
96-
client = new ElasticsearchClient(new Rest5ClientTransport(restClient, new JacksonJsonpMapper(mapper), null, new ElasticSearchLogInstrumentation()));
68+
client = new ElasticsearchClient(new RestClientTransport(restClient, new JacksonJsonpMapper(mapper), null, NoopInstrumentation.INSTANCE));
9769
}
9870
}
9971

@@ -212,7 +184,7 @@ public void deleteIndex(String index) {
212184
public ClusterStateResponse state() {
213185
var watch = new StopWatch();
214186
try {
215-
return client.cluster().state(builder -> builder.metric("metadata")).state().to(ClusterStateResponse.class);
187+
return client.cluster().state(builder -> builder.metric("metadata")).valueBody().to(ClusterStateResponse.class);
216188
} catch (IOException e) {
217189
throw new UncheckedIOException(e);
218190
} catch (ElasticsearchException e) {

core-ng-search/src/main/java/core/framework/search/impl/ElasticSearchLogInstrumentation.java

Lines changed: 0 additions & 81 deletions
This file was deleted.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package core.framework.search.impl;
2+
3+
import core.framework.internal.log.filter.LogParam;
4+
import org.apache.http.HttpEntity;
5+
import org.apache.http.HttpEntityEnclosingRequest;
6+
import org.apache.http.HttpRequest;
7+
import org.apache.http.HttpRequestInterceptor;
8+
import org.apache.http.RequestLine;
9+
import org.apache.http.protocol.HttpContext;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
import java.io.IOException;
14+
import java.io.InputStreamReader;
15+
import java.io.Reader;
16+
import java.nio.charset.StandardCharsets;
17+
import java.util.Set;
18+
19+
/**
20+
* @author neo
21+
*/
22+
public class ElasticSearchLogInterceptor implements HttpRequestInterceptor {
23+
private final Logger logger = LoggerFactory.getLogger(ElasticSearchLogInterceptor.class);
24+
25+
// only request can be logged, apache http client execute in different thread (NIO), and response entity can only be consumed once
26+
@Override
27+
public void process(HttpRequest request, HttpContext context) {
28+
RequestLine requestLine = request.getRequestLine();
29+
logger.debug("[request] method={}, uri={}", requestLine.getMethod(), requestLine.getUri());
30+
if (request instanceof final HttpEntityEnclosingRequest entityRequest) {
31+
HttpEntity entity = entityRequest.getEntity();
32+
if (entity != null) {
33+
logger.debug("[request] body={}", new BodyParam(entity));
34+
}
35+
}
36+
}
37+
38+
record BodyParam(HttpEntity entity) implements LogParam {
39+
@Override
40+
public void append(StringBuilder builder, Set<String> maskedFields, int maxParamLength) {
41+
// refer to co.elastic.clients.transport.rest_client.RestClientTransport.prepareLowLevelRequest
42+
// it always uses ByteArrayEntity, thus always has content length
43+
try (Reader reader = new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)) {
44+
char[] buffer;
45+
46+
if (entity.isChunked()) {
47+
// with bulkIndex, es uses co.elastic.clients.transport.rest_client.MultiBufferEntity, which is chunked and content length = -1
48+
buffer = new char[maxParamLength + 1];
49+
} else {
50+
buffer = new char[Math.min((int) entity.getContentLength(), maxParamLength + 1)];
51+
}
52+
int read = reader.read(buffer);
53+
54+
if (read > 0) builder.append(buffer, 0, Math.min(read, maxParamLength)); // read = -1 if nothing read into buffer
55+
if (read > maxParamLength) {
56+
builder.append("...(truncated)");
57+
}
58+
} catch (IOException e) {
59+
throw new Error(e); // not expected io exception, as it's from ByteArrayEntity
60+
}
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)