Skip to content

Commit 4bce40c

Browse files
committed
* kafka: updated to 4.0.0
Signed-off-by: neo <[email protected]>
1 parent ed6dab9 commit 4bce40c

File tree

7 files changed

+36
-11
lines changed

7 files changed

+36
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* sse: log event count and size on sse:close action
99
* ws: remove ws support
1010
> websocket is not used anymore, use sse/ajax instead
11+
* kafka: updated to 4.0.0
1112

1213
### 9.1.7 (2/26/2025 - 3/6/2025)
1314

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ project("core-ng") {
3232
implementation("com.squareup.okio:okio:3.2.0") // okio 3.3.0 has synchronization issue with virtual thread
3333
implementation("org.jetbrains.kotlin:kotlin-stdlib:2.0.20")
3434
implementation("io.undertow:undertow-core:2.3.18.Final")
35-
implementation("org.apache.kafka:kafka-clients:3.9.0")
35+
implementation("org.apache.kafka:kafka-clients:4.0.0")
3636
compileOnly("org.jboss.logging:jboss-logging-annotations:2.2.1.Final")
3737
compileOnly("com.github.spotbugs:spotbugs-annotations:4.8.3")
3838
testImplementation("org.junit.jupiter:junit-jupiter-api:${junitVersion}")
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package core.framework.internal.kafka;
2+
3+
import org.apache.kafka.common.metrics.KafkaMetric;
4+
import org.apache.kafka.common.metrics.MetricsReporter;
5+
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
public class EmptyMetricsReporter implements MetricsReporter {
10+
@Override
11+
public void init(List<KafkaMetric> metrics) {
12+
}
13+
14+
@Override
15+
public void metricChange(KafkaMetric metric) {
16+
}
17+
18+
@Override
19+
public void metricRemoval(KafkaMetric metric) {
20+
}
21+
22+
@Override
23+
public void close() {
24+
}
25+
26+
@Override
27+
public void configure(Map<String, ?> configs) {
28+
}
29+
}

core-ng/src/main/java/core/framework/internal/kafka/MessageListener.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ String threadName(String name) {
9797
return "kafka-listener" + (name == null ? "" : "-" + name);
9898
}
9999

100-
@SuppressWarnings("deprecation")
101100
Consumer<String, byte[]> createConsumer() {
102101
var watch = new StopWatch();
103102
try {
@@ -114,7 +113,7 @@ Consumer<String, byte[]> createConsumer() {
114113
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPollBytes);
115114
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minPollBytes);
116115
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, (int) maxWaitTime.toMillis());
117-
config.put(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, Boolean.FALSE);
116+
config.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, EmptyMetricsReporter.class.getName());
118117
config.put(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, Boolean.FALSE);
119118
Consumer<String, byte[]> consumer = new KafkaConsumer<>(config, new KeyDeserializer(), new ByteArrayDeserializer());
120119
consumerMetrics.add(consumer.metrics());

core-ng/src/main/java/core/framework/internal/kafka/MessageProducer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public void send(ProducerRecord<byte[], byte[]> record) {
4343
producer.send(record, new KafkaCallback(record));
4444
}
4545

46-
@SuppressWarnings("deprecation")
4746
Producer<byte[], byte[]> createProducer(KafkaURI uri) {
4847
var watch = new StopWatch();
4948
try {
@@ -55,7 +54,7 @@ Producer<byte[], byte[]> createProducer(KafkaURI uri) {
5554
ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5_000L, // 5s
5655
ProducerConfig.MAX_BLOCK_MS_CONFIG, 30_000L, // 30s, metadata update timeout, shorter than default, to get exception sooner if kafka is not available
5756
ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize,
58-
ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, Boolean.FALSE,
57+
ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, EmptyMetricsReporter.class.getName(),
5958
ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, Boolean.FALSE);
6059

6160
var serializer = new ByteArraySerializer();

core-ng/src/main/java/core/framework/internal/log/DefaultLoggerFactory.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ private Logger createLogger(String name) {
2323

2424
private LogLevel infoLevel(String name) {
2525
// kafka log info for every producer/consumer, to reduce verbosity
26-
if ("org.apache.kafka.clients.consumer.ConsumerConfig".equals(name)
27-
|| "org.apache.kafka.clients.producer.ProducerConfig".equals(name)
28-
|| "org.apache.kafka.clients.admin.AdminClientConfig".equals(name)
29-
|| "org.apache.kafka.common.utils.AppInfoParser".equals(name)) {
26+
if (name.startsWith("org.apache.kafka.")) {
3027
return LogLevel.WARN;
3128
}
3229
return LogLevel.INFO;

docker/kafka/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
kafka:
3-
image: apache/kafka:3.9.0
3+
image: apache/kafka-native:4.0.0
44
hostname: dev.internal
55
ports:
66
- 9092:9092
@@ -26,7 +26,7 @@ services:
2626
volumes:
2727
- data:/var/lib/kafka/data
2828
# mm:
29-
# image: apache/kafka:3.9.0
29+
# image: apache/kafka:4.0.0
3030
# volumes:
3131
# - ./connect-mirror-maker.properties:/opt/kafka/config/connect-mirror-maker.properties
3232
# entrypoint: ["/bin/bash", "-c", "/opt/kafka/bin/connect-mirror-maker.sh /opt/kafka/config/connect-mirror-maker.properties"]

0 commit comments

Comments
 (0)