Skip to content

Commit cc83d99

Browse files
authored
Merge pull request #4 from danielkec/7683-kafka-native-image
Kafka connector native image updates
2 parents 717ccd4 + f2f4b13 commit cc83d99

File tree

14 files changed

+58
-483
lines changed

14 files changed

+58
-483
lines changed

dependencies/pom.xml

+1-10
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
<version.lib.jersey>3.1.3</version.lib.jersey>
9595
<version.lib.jgit>6.7.0.202309050840-r</version.lib.jgit>
9696
<version.lib.junit>5.9.3</version.lib.junit>
97-
<version.lib.kafka>3.4.0</version.lib.kafka>
97+
<version.lib.kafka>3.5.1</version.lib.kafka>
9898
<version.lib.kotlin>1.8.0</version.lib.kotlin>
9999
<version.lib.log4j>2.18.0</version.lib.log4j>
100100
<version.lib.logback>1.4.0</version.lib.logback>
@@ -944,15 +944,6 @@
944944
<groupId>org.apache.kafka</groupId>
945945
<artifactId>kafka-clients</artifactId>
946946
<version>${version.lib.kafka}</version>
947-
<!-- Snip transitive dependency on Snappy (which should be optional anyways). -->
948-
<!-- This can be removed once kafka-clients is upgraded -->
949-
<!-- to 3.4.2 or newer. See https://issues.apache.org/jira/browse/KAFKA-15096 -->
950-
<exclusions>
951-
<exclusion>
952-
<groupId>org.xerial.snappy</groupId>
953-
<artifactId>snappy-java</artifactId>
954-
</exclusion>
955-
</exclusions>
956947
</dependency>
957948
<dependency>
958949
<!-- required transitively by okhttp (used in OpenTelemetry through Jaeger) -->

examples/messaging/docker/kafka/Dockerfile.kafka

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2019, 2021 Oracle and/or its affiliates.
2+
# Copyright (c) 2019, 2023 Oracle and/or its affiliates.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -14,12 +14,12 @@
1414
# limitations under the License.
1515
#
1616

17-
FROM openjdk:17-jdk-slim-buster
17+
FROM container-registry.oracle.com/java/openjdk:21
1818

1919
ENV VERSION=2.7.0
2020
ENV SCALA_VERSION=2.13
2121

22-
RUN apt-get -qq update && apt-get -qq -y install bash curl wget netcat jq
22+
RUN dnf update && dnf -y install wget jq nc
2323

2424
RUN REL_PATH=kafka/${VERSION}/kafka_${SCALA_VERSION}-${VERSION}.tgz \
2525
&& BACKUP_ARCHIVE=https://archive.apache.org/dist/ \

examples/messaging/docker/kafka/init_topics.sh

+32-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/bash
22
#
3-
# Copyright (c) 2020, 2021 Oracle and/or its affiliates.
3+
# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.
@@ -40,9 +40,39 @@ while sleep 2; do
4040
--replication-factor 1 \
4141
--partitions 10 \
4242
--topic messaging-test-topic-2
43+
bash $KAFKA_TOPICS \
44+
--create \
45+
--replication-factor 1 \
46+
--partitions 10 \
47+
--config compression.type=snappy \
48+
--topic messaging-test-topic-snappy-compressed
49+
bash $KAFKA_TOPICS \
50+
--create \
51+
--replication-factor 1 \
52+
--partitions 10 \
53+
--config compression.type=lz4 \
54+
--topic messaging-test-topic-lz4-compressed
55+
bash $KAFKA_TOPICS \
56+
--create \
57+
--replication-factor 1 \
58+
--partitions 10 \
59+
--config compression.type=zstd \
60+
--topic messaging-test-topic-zstd-compressed
61+
bash $KAFKA_TOPICS \
62+
--create \
63+
--replication-factor 1 \
64+
--partitions 10 \
65+
--config compression.type=gzip \
66+
--topic messaging-test-topic-gzip-compressed
4367

4468
echo
45-
echo "Example topics messaging-test-topic-1 and messaging-test-topic-2 created"
69+
echo "Example topics created:"
70+
echo " messaging-test-topic-1"
71+
echo " messaging-test-topic-2"
72+
echo " messaging-test-topic-snappy-compressed"
73+
echo " messaging-test-topic-lz4-compressed"
74+
echo " messaging-test-topic-zstd-compressed"
75+
echo " messaging-test-topic-gzip-compressed"
4676
echo
4777
echo "================== Kafka is ready, stop it with Ctrl+C =================="
4878
exit 0

examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/SendingService.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@ class SendingService implements HttpService {
3535

3636
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
3737
String topic = config.get("app.kafka.topic").asString().get();
38+
String compression = config.get("app.kafka.compression").asString().orElse("none");
3839

3940
// Prepare channel for connecting processor -> kafka connector with specific subscriber configuration,
4041
// channel -> connector mapping is automatic when using KafkaConnector.configBuilder()
4142
Channel<String> toKafka = Channel.<String>builder()
4243
.subscriberConfig(KafkaConnector.configBuilder()
4344
.bootstrapServers(kafkaServer)
4445
.topic(topic)
46+
.compressionType(compression)
4547
.keySerializer(StringSerializer.class)
4648
.valueSerializer(StringSerializer.class)
4749
.build())

examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/WebSocketEndpoint.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public void onOpen(WsSession session) {
4747

4848
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
4949
String topic = config.get("app.kafka.topic").asString().get();
50+
String compression = config.get("app.kafka.compression").asString().orElse("none");
5051

5152
// Prepare channel for connecting kafka connector with specific publisher configuration -> listener,
5253
// channel -> connector mapping is automatic when using KafkaConnector.configBuilder()
@@ -60,6 +61,7 @@ public void onOpen(WsSession session) {
6061
.enableAutoCommit(true)
6162
.keyDeserializer(StringDeserializer.class)
6263
.valueDeserializer(StringDeserializer.class)
64+
.compressionType(compression)
6365
.build()
6466
)
6567
.build();
@@ -72,7 +74,7 @@ public void onOpen(WsSession session) {
7274
.listener(fromKafka, payload -> {
7375
System.out.println("Kafka says: " + payload);
7476
// Send message received from Kafka over websocket
75-
session.send(payload, false);
77+
session.send(payload, true);
7678
})
7779
.build()
7880
.start();

examples/messaging/kafka-websocket-se/src/main/resources/application.yaml

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2020 Oracle and/or its affiliates.
2+
# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -17,7 +17,11 @@
1717
app:
1818
kafka:
1919
bootstrap.servers: localhost:9092
20-
topic: messaging-test-topic-1
20+
compression: snappy
21+
# compression: lz4
22+
# compression: zstd
23+
# compression: gzip
24+
topic: messaging-test-topic-${app.kafka.compression}-compressed
2125

2226
server:
2327
port: 7001

messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/AppInfoParserSubstitution.java

-38
This file was deleted.

messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/CompressionTypeSubstitution.java

-155
This file was deleted.

0 commit comments

Comments
 (0)