Skip to content

Commit 8db891a

Browse files
authored
Add support for apache/kafka (#8416)
Fixes #8398
1 parent cda5609 commit 8db891a

File tree

5 files changed

+246
-112
lines changed

5 files changed

+246
-112
lines changed

docs/modules/kafka.md

+10-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1-
# Kafka Containers
1+
# Kafka Module
22

33
Testcontainers can be used to automatically instantiate and manage [Apache Kafka](https://kafka.apache.org) containers.
4-
More precisely Testcontainers uses the official Docker images for [Confluent OSS Platform](https://hub.docker.com/r/confluentinc/cp-kafka/)
4+
5+
Currently, two different Kafka images are supported:
6+
7+
* `org.testcontainers.containers.KafkaContainer` supports
8+
[confluentinc/cp-kafka](https://hub.docker.com/r/confluentinc/cp-kafka/)
9+
* `org.testcontainers.kafka.KafkaContainer` supports [apache/kafka](https://hub.docker.com/r/apache/kafka/)
510

611
## Benefits
712

@@ -24,6 +29,9 @@ Now your tests or any other process running on your machine can get access to ru
2429
<!--/codeinclude-->
2530

2631
## Options
32+
33+
!!! note
34+
The options below are only available for `org.testcontainers.containers.KafkaContainer`
2735

2836
### <a name="zookeeper"></a> Using external Zookeeper
2937

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package org.testcontainers.kafka;
2+
3+
import com.github.dockerjava.api.command.InspectContainerResponse;
4+
import org.testcontainers.containers.GenericContainer;
5+
import org.testcontainers.containers.wait.strategy.Wait;
6+
import org.testcontainers.images.builder.Transferable;
7+
import org.testcontainers.utility.DockerImageName;
8+
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
12+
/**
13+
* Testcontainers implementation for Apache Kafka.
14+
* <p>
15+
* Supported image: {@code apache/kafka}
16+
* <p>
17+
* Exposed ports: 9092
18+
*/
19+
public class KafkaContainer extends GenericContainer<KafkaContainer> {
20+
21+
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apache/kafka");
22+
23+
private static final int KAFKA_PORT = 9092;
24+
25+
private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
26+
27+
private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
28+
29+
private static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";
30+
31+
public KafkaContainer(String imageName) {
32+
this(DockerImageName.parse(imageName));
33+
}
34+
35+
public KafkaContainer(DockerImageName dockerImageName) {
36+
super(dockerImageName);
37+
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
38+
39+
withExposedPorts(KAFKA_PORT);
40+
withEnv("CLUSTER_ID", DEFAULT_CLUSTER_ID);
41+
42+
withEnv(
43+
"KAFKA_LISTENERS",
44+
"PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9093, CONTROLLER://0.0.0.0:9094"
45+
);
46+
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");
47+
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
48+
withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
49+
withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
50+
51+
withEnv("KAFKA_NODE_ID", "1");
52+
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
53+
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
54+
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
55+
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
56+
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
57+
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
58+
59+
withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
60+
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
61+
}
62+
63+
@Override
64+
protected void configure() {
65+
String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
66+
String networkAlias = getNetwork() != null ? firstNetworkAlias : "localhost";
67+
String controllerQuorumVoters = String.format("%s@%s:9094", getEnvMap().get("KAFKA_NODE_ID"), networkAlias);
68+
withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters);
69+
}
70+
71+
@Override
72+
protected void containerIsStarting(InspectContainerResponse containerInfo) {
73+
String brokerAdvertisedListener = String.format(
74+
"BROKER://%s:%s",
75+
containerInfo.getConfig().getHostName(),
76+
"9093"
77+
);
78+
List<String> advertisedListeners = new ArrayList<>();
79+
advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
80+
advertisedListeners.add(brokerAdvertisedListener);
81+
String kafkaAdvertisedListeners = String.join(",", advertisedListeners);
82+
String command = "#!/bin/bash\n";
83+
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
84+
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);
85+
86+
command += "/etc/kafka/docker/run \n";
87+
copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
88+
}
89+
90+
public String getBootstrapServers() {
91+
return String.format("%s:%s", getHost(), getMappedPort(KAFKA_PORT));
92+
}
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package org.testcontainers;
2+
3+
import com.google.common.collect.ImmutableMap;
4+
import org.apache.kafka.clients.admin.AdminClient;
5+
import org.apache.kafka.clients.admin.AdminClientConfig;
6+
import org.apache.kafka.clients.admin.NewTopic;
7+
import org.apache.kafka.clients.consumer.ConsumerConfig;
8+
import org.apache.kafka.clients.consumer.ConsumerRecord;
9+
import org.apache.kafka.clients.consumer.ConsumerRecords;
10+
import org.apache.kafka.clients.consumer.KafkaConsumer;
11+
import org.apache.kafka.clients.producer.KafkaProducer;
12+
import org.apache.kafka.clients.producer.ProducerConfig;
13+
import org.apache.kafka.clients.producer.ProducerRecord;
14+
import org.apache.kafka.common.config.SaslConfigs;
15+
import org.apache.kafka.common.serialization.StringDeserializer;
16+
import org.apache.kafka.common.serialization.StringSerializer;
17+
import org.rnorth.ducttape.unreliables.Unreliables;
18+
19+
import java.time.Duration;
20+
import java.util.Collection;
21+
import java.util.Collections;
22+
import java.util.Properties;
23+
import java.util.UUID;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.assertj.core.api.Assertions.tuple;
28+
29+
public class AbstractKafka {
30+
31+
private final ImmutableMap<String, String> properties = ImmutableMap.of(
32+
AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
33+
"SASL_PLAINTEXT",
34+
SaslConfigs.SASL_MECHANISM,
35+
"PLAIN",
36+
SaslConfigs.SASL_JAAS_CONFIG,
37+
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
38+
);
39+
40+
protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
41+
testKafkaFunctionality(bootstrapServers, false, 1, 1);
42+
}
43+
44+
protected void testSecureKafkaFunctionality(String bootstrapServers) throws Exception {
45+
testKafkaFunctionality(bootstrapServers, true, 1, 1);
46+
}
47+
48+
protected void testKafkaFunctionality(String bootstrapServers, boolean authenticated, int partitions, int rf)
49+
throws Exception {
50+
ImmutableMap<String, String> adminClientDefaultProperties = ImmutableMap.of(
51+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
52+
bootstrapServers
53+
);
54+
Properties adminClientProperties = new Properties();
55+
adminClientProperties.putAll(adminClientDefaultProperties);
56+
57+
ImmutableMap<String, String> consumerDefaultProperties = ImmutableMap.of(
58+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
59+
bootstrapServers,
60+
ConsumerConfig.GROUP_ID_CONFIG,
61+
"tc-" + UUID.randomUUID(),
62+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
63+
"earliest"
64+
);
65+
Properties consumerProperties = new Properties();
66+
consumerProperties.putAll(consumerDefaultProperties);
67+
68+
ImmutableMap<String, String> producerDefaultProperties = ImmutableMap.of(
69+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
70+
bootstrapServers,
71+
ProducerConfig.CLIENT_ID_CONFIG,
72+
UUID.randomUUID().toString()
73+
);
74+
Properties producerProperties = new Properties();
75+
producerProperties.putAll(producerDefaultProperties);
76+
77+
if (authenticated) {
78+
adminClientProperties.putAll(this.properties);
79+
consumerProperties.putAll(this.properties);
80+
producerProperties.putAll(this.properties);
81+
}
82+
try (
83+
AdminClient adminClient = AdminClient.create(adminClientProperties);
84+
KafkaProducer<String, String> producer = new KafkaProducer<>(
85+
producerProperties,
86+
new StringSerializer(),
87+
new StringSerializer()
88+
);
89+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
90+
consumerProperties,
91+
new StringDeserializer(),
92+
new StringDeserializer()
93+
);
94+
) {
95+
String topicName = "messages-" + UUID.randomUUID();
96+
97+
Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
98+
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
99+
100+
consumer.subscribe(Collections.singletonList(topicName));
101+
102+
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
103+
104+
Unreliables.retryUntilTrue(
105+
10,
106+
TimeUnit.SECONDS,
107+
() -> {
108+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
109+
110+
if (records.isEmpty()) {
111+
return false;
112+
}
113+
114+
assertThat(records)
115+
.hasSize(1)
116+
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
117+
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
118+
119+
return true;
120+
}
121+
);
122+
123+
consumer.unsubscribe();
124+
}
125+
}
126+
}

modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java

+2-110
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,25 @@
55
import org.apache.kafka.clients.admin.AdminClient;
66
import org.apache.kafka.clients.admin.AdminClientConfig;
77
import org.apache.kafka.clients.admin.NewTopic;
8-
import org.apache.kafka.clients.consumer.ConsumerConfig;
9-
import org.apache.kafka.clients.consumer.ConsumerRecord;
10-
import org.apache.kafka.clients.consumer.ConsumerRecords;
11-
import org.apache.kafka.clients.consumer.KafkaConsumer;
12-
import org.apache.kafka.clients.producer.KafkaProducer;
13-
import org.apache.kafka.clients.producer.ProducerConfig;
14-
import org.apache.kafka.clients.producer.ProducerRecord;
158
import org.apache.kafka.common.config.SaslConfigs;
169
import org.apache.kafka.common.errors.SaslAuthenticationException;
1710
import org.apache.kafka.common.errors.TopicAuthorizationException;
18-
import org.apache.kafka.common.serialization.StringDeserializer;
19-
import org.apache.kafka.common.serialization.StringSerializer;
2011
import org.awaitility.Awaitility;
2112
import org.junit.Test;
22-
import org.rnorth.ducttape.unreliables.Unreliables;
13+
import org.testcontainers.AbstractKafka;
2314
import org.testcontainers.Testcontainers;
2415
import org.testcontainers.images.builder.Transferable;
2516
import org.testcontainers.utility.DockerImageName;
2617

27-
import java.time.Duration;
2818
import java.util.Collection;
2919
import java.util.Collections;
30-
import java.util.Properties;
3120
import java.util.UUID;
3221
import java.util.concurrent.TimeUnit;
3322

3423
import static org.assertj.core.api.Assertions.assertThat;
3524
import static org.assertj.core.api.Assertions.assertThatThrownBy;
36-
import static org.assertj.core.api.Assertions.tuple;
3725

38-
public class KafkaContainerTest {
26+
public class KafkaContainerTest extends AbstractKafka {
3927

4028
private static final DockerImageName KAFKA_TEST_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:6.2.1");
4129

@@ -45,15 +33,6 @@ public class KafkaContainerTest {
4533
"confluentinc/cp-zookeeper:4.0.0"
4634
);
4735

48-
private final ImmutableMap<String, String> properties = ImmutableMap.of(
49-
AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
50-
"SASL_PLAINTEXT",
51-
SaslConfigs.SASL_MECHANISM,
52-
"PLAIN",
53-
SaslConfigs.SASL_JAAS_CONFIG,
54-
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
55-
);
56-
5736
@Test
5837
public void testUsage() throws Exception {
5938
try (KafkaContainer kafka = new KafkaContainer(KAFKA_TEST_IMAGE)) {
@@ -344,91 +323,4 @@ private static String getJaasConfig() {
344323
"user_test=\"secret\";";
345324
return jaasConfig;
346325
}
347-
348-
private void testKafkaFunctionality(String bootstrapServers) throws Exception {
349-
testKafkaFunctionality(bootstrapServers, false, 1, 1);
350-
}
351-
352-
private void testSecureKafkaFunctionality(String bootstrapServers) throws Exception {
353-
testKafkaFunctionality(bootstrapServers, true, 1, 1);
354-
}
355-
356-
private void testKafkaFunctionality(String bootstrapServers, boolean authenticated, int partitions, int rf)
357-
throws Exception {
358-
ImmutableMap<String, String> adminClientDefaultProperties = ImmutableMap.of(
359-
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
360-
bootstrapServers
361-
);
362-
Properties adminClientProperties = new Properties();
363-
adminClientProperties.putAll(adminClientDefaultProperties);
364-
365-
ImmutableMap<String, String> consumerDefaultProperties = ImmutableMap.of(
366-
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
367-
bootstrapServers,
368-
ConsumerConfig.GROUP_ID_CONFIG,
369-
"tc-" + UUID.randomUUID(),
370-
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
371-
"earliest"
372-
);
373-
Properties consumerProperties = new Properties();
374-
consumerProperties.putAll(consumerDefaultProperties);
375-
376-
ImmutableMap<String, String> producerDefaultProperties = ImmutableMap.of(
377-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
378-
bootstrapServers,
379-
ProducerConfig.CLIENT_ID_CONFIG,
380-
UUID.randomUUID().toString()
381-
);
382-
Properties producerProperties = new Properties();
383-
producerProperties.putAll(producerDefaultProperties);
384-
385-
if (authenticated) {
386-
adminClientProperties.putAll(this.properties);
387-
consumerProperties.putAll(this.properties);
388-
producerProperties.putAll(this.properties);
389-
}
390-
try (
391-
AdminClient adminClient = AdminClient.create(adminClientProperties);
392-
KafkaProducer<String, String> producer = new KafkaProducer<>(
393-
producerProperties,
394-
new StringSerializer(),
395-
new StringSerializer()
396-
);
397-
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
398-
consumerProperties,
399-
new StringDeserializer(),
400-
new StringDeserializer()
401-
);
402-
) {
403-
String topicName = "messages-" + UUID.randomUUID();
404-
405-
Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
406-
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
407-
408-
consumer.subscribe(Collections.singletonList(topicName));
409-
410-
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
411-
412-
Unreliables.retryUntilTrue(
413-
10,
414-
TimeUnit.SECONDS,
415-
() -> {
416-
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
417-
418-
if (records.isEmpty()) {
419-
return false;
420-
}
421-
422-
assertThat(records)
423-
.hasSize(1)
424-
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
425-
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
426-
427-
return true;
428-
}
429-
);
430-
431-
consumer.unsubscribe();
432-
}
433-
}
434326
}

0 commit comments

Comments
 (0)