Skip to content

Commit 8cffdbf

Browse files
committed
feat: additional advertised listners/broker addresses for kafka
1 parent 35bf0cd commit 8cffdbf

File tree

4 files changed

+564
-23
lines changed

4 files changed

+564
-23
lines changed

docs/modules/kafka.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,15 @@ The Kafka container will be started using a custom shell script:
6161
[Init script](../../modules/kafka/kafka.go) inside_block:starterScript
6262
<!--/codeinclude-->
6363

64+
That will set the advertised listeners with these values:
65+
66+
<!--codeinclude-->
67+
[Advertised Listeners](../../modules/kafka/kafka.go) inside_block:advertisedListeners
68+
<!--/codeinclude-->
69+
70+
KafkaContainer provides methods to read the broker addresses for different
71+
connectivity environments.
72+
6473
#### Environment variables
6574

6675
The environment variables that are already set by default are:
@@ -82,3 +91,34 @@ The `Brokers(ctx)` method returns the Kafka brokers as a string slice, containin
8291
<!--codeinclude-->
8392
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:getBrokers
8493
<!--/codeinclude-->
94+
95+
#### BrokersByHostDockerInternal
96+
97+
The `BrokersByHostDockerInternal(ctx)` method returns the Kafka brokers as a
98+
string slice, containing the hostname `host.docker.internal` and a random port
99+
defined by Kafka's public port (`19092/tcp`).
100+
101+
This method is useful when you need to run additional containers that need to
102+
connect to Kafka.
103+
104+
<!--codeinclude-->
105+
[Get Kafka brokers by host.docker.internal](../../modules/kafka/examples_test.go) inside_block:getBrokersByHostDockerInternal
106+
<!--/codeinclude-->
107+
108+
#### BrokersByContainerName
109+
110+
The `BrokersByContainerName(ctx)` method returns the Kafka brokers as a string
111+
slice, addressed by the container's name(`Ex: charming_dijkstra:19093`). This
112+
method is useful when you need to run additional containers that need to connect
113+
to Kafka.
114+
115+
To use this broker address you should run all the containers inside a docker
116+
network.
117+
118+
<!--codeinclude-->
119+
[Start Kafka inside a docker network](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kafka
120+
<!--/codeinclude-->
121+
122+
<!--codeinclude-->
123+
[Get Kafka brokers by container name](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kcat
124+
<!--/codeinclude-->

modules/kafka/examples_test.go

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
package kafka_test
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
7+
"io"
68
"log"
9+
"strings"
710

11+
"github.com/IBM/sarama"
12+
"github.com/docker/docker/api/types/container"
813
"github.com/testcontainers/testcontainers-go"
914
"github.com/testcontainers/testcontainers-go/modules/kafka"
15+
"github.com/testcontainers/testcontainers-go/network"
16+
"github.com/testcontainers/testcontainers-go/wait"
1017
)
1118

1219
func ExampleRun() {
@@ -41,3 +48,299 @@ func ExampleRun() {
4148
// test-cluster
4249
// true
4350
}
51+
52+
func ExampleKafkaContainer_BrokersByHostDockerInternal() {
53+
ctx := context.Background()
54+
55+
kafkaContainer, err := kafka.Run(ctx,
56+
"confluentinc/confluent-local:7.5.0",
57+
kafka.WithClusterID("test-cluster"),
58+
)
59+
if err != nil {
60+
log.Fatalf("failed to start container: %s", err)
61+
}
62+
63+
// Clean up the container after
64+
defer func() {
65+
if err := kafkaContainer.Terminate(ctx); err != nil {
66+
log.Fatalf("failed to terminate container: %s", err)
67+
}
68+
}()
69+
70+
{
71+
state, err := kafkaContainer.State(ctx)
72+
if err != nil {
73+
log.Fatalf("failed to get container state: %s", err) // nolint:gocritic
74+
}
75+
76+
fmt.Println(kafkaContainer.ClusterID)
77+
fmt.Println(state.Running)
78+
}
79+
80+
const topic = "example-topic"
81+
82+
// Produce a message from the host that will be read by a consumer in another docker container
83+
{
84+
brokers, err := kafkaContainer.Brokers(ctx)
85+
86+
config := sarama.NewConfig()
87+
config.Producer.Return.Successes = true
88+
producer, err := sarama.NewSyncProducer(brokers, config)
89+
if err != nil {
90+
log.Fatal(err)
91+
}
92+
93+
if _, _, err := producer.SendMessage(&sarama.ProducerMessage{
94+
Topic: topic,
95+
Key: sarama.StringEncoder("key"),
96+
Value: sarama.StringEncoder("example_message_value"),
97+
}); err != nil {
98+
log.Fatal(err)
99+
}
100+
101+
}
102+
103+
// getBrokersByHostDockerInternal {
104+
brokers, err := kafkaContainer.BrokersByHostDockerInternal(ctx)
105+
if err != nil {
106+
log.Fatal(err)
107+
}
108+
109+
// Run another container that can connect to the kafka container via hostname "host.docker.internal"
110+
kcat, err := testcontainers.GenericContainer(
111+
ctx,
112+
testcontainers.GenericContainerRequest{
113+
ContainerRequest: testcontainers.ContainerRequest{
114+
Image: "confluentinc/cp-kafkacat",
115+
Entrypoint: []string{"kafkacat"},
116+
Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"},
117+
WaitingFor: wait.ForExit(),
118+
119+
// Add host.docker.internal to the consumer container so it can contact the kafka borkers
120+
HostConfigModifier: func(hc *container.HostConfig) {
121+
hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway")
122+
},
123+
},
124+
Started: true,
125+
},
126+
)
127+
if err != nil {
128+
log.Fatalf("kafkacat error: %v", err)
129+
}
130+
131+
lr, err := kcat.Logs(ctx)
132+
if err != nil {
133+
log.Fatalf("kafkacat logs error: %v", err)
134+
}
135+
136+
logs, err := io.ReadAll(lr)
137+
if err != nil {
138+
log.Fatalf("kafkacat logs read error: %v", err)
139+
}
140+
141+
fmt.Println("read message:", string(bytes.TrimSpace(logs)))
142+
// }
143+
144+
// Output:
145+
// test-cluster
146+
// true
147+
// read message: example_message_value
148+
}
149+
150+
func ExampleKafkaContainer_BrokersByContainerName() {
151+
ctx := context.Background()
152+
153+
// getBrokersByContainerName_Kafka {
154+
net, err := network.New(ctx)
155+
if err != nil {
156+
log.Fatalf("failed to create network: %s", err)
157+
}
158+
159+
kafkaContainer, err := kafka.Run(ctx,
160+
"confluentinc/confluent-local:7.5.0",
161+
kafka.WithClusterID("test-cluster"),
162+
network.WithNetwork(nil, net), // Run kafka test container in a new docker network
163+
)
164+
if err != nil {
165+
log.Fatalf("failed to start container: %s", err)
166+
}
167+
// }
168+
169+
// Clean up the container after
170+
defer func() {
171+
if err := kafkaContainer.Terminate(ctx); err != nil {
172+
log.Fatalf("failed to terminate container: %s", err)
173+
}
174+
}()
175+
176+
{
177+
state, err := kafkaContainer.State(ctx)
178+
if err != nil {
179+
log.Fatalf("failed to get container state: %s", err) // nolint:gocritic
180+
}
181+
182+
fmt.Println(kafkaContainer.ClusterID)
183+
fmt.Println(state.Running)
184+
}
185+
186+
const topic = "example-topic"
187+
188+
// Produce a message from the host that will be read by a consumer in another docker container
189+
{
190+
brokers, err := kafkaContainer.Brokers(ctx)
191+
192+
config := sarama.NewConfig()
193+
config.Producer.Return.Successes = true
194+
producer, err := sarama.NewSyncProducer(brokers, config)
195+
if err != nil {
196+
log.Fatal(err)
197+
}
198+
199+
if _, _, err := producer.SendMessage(&sarama.ProducerMessage{
200+
Topic: topic,
201+
Key: sarama.StringEncoder("key"),
202+
Value: sarama.StringEncoder("example_message_value"),
203+
}); err != nil {
204+
log.Fatal(err)
205+
}
206+
}
207+
208+
// getBrokersByContainerName_Kcat {
209+
brokers, err := kafkaContainer.BrokersByContainerName(ctx)
210+
if err != nil {
211+
log.Fatal(err)
212+
}
213+
214+
// Run another container that can connect to the kafka container via the kafka containers name
215+
kcat, err := testcontainers.GenericContainer(
216+
ctx,
217+
testcontainers.GenericContainerRequest{
218+
ContainerRequest: testcontainers.ContainerRequest{
219+
Image: "confluentinc/cp-kafkacat",
220+
Entrypoint: []string{"kafkacat"},
221+
Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"},
222+
WaitingFor: wait.ForExit(),
223+
Networks: []string{net.Name}, // Run kafkacat in the same docker network as the testcontainer
224+
},
225+
Started: true,
226+
},
227+
)
228+
if err != nil {
229+
log.Fatalf("kafkacat error: %v", err)
230+
}
231+
232+
lr, err := kcat.Logs(ctx)
233+
if err != nil {
234+
log.Fatalf("kafkacat logs error: %v", err)
235+
}
236+
237+
logs, err := io.ReadAll(lr)
238+
if err != nil {
239+
log.Fatalf("kafkacat logs read error: %v", err)
240+
}
241+
242+
fmt.Println("read message:", string(bytes.TrimSpace(logs)))
243+
// }
244+
245+
// Output:
246+
// test-cluster
247+
// true
248+
// read message: example_message_value
249+
}
250+
251+
func ExampleKafkaContainer_BrokersByContainerId() {
252+
ctx := context.Background()
253+
254+
net, err := network.New(ctx)
255+
if err != nil {
256+
log.Fatalf("failed to create network: %s", err)
257+
}
258+
259+
kafkaContainer, err := kafka.Run(ctx,
260+
"confluentinc/confluent-local:7.5.0",
261+
kafka.WithClusterID("test-cluster"),
262+
network.WithNetwork(nil, net), // Run kafka test container in a new docker network
263+
)
264+
if err != nil {
265+
log.Fatalf("failed to start container: %s", err)
266+
}
267+
268+
// Clean up the container after
269+
defer func() {
270+
if err := kafkaContainer.Terminate(ctx); err != nil {
271+
log.Fatalf("failed to terminate container: %s", err)
272+
}
273+
}()
274+
275+
{
276+
state, err := kafkaContainer.State(ctx)
277+
if err != nil {
278+
log.Fatalf("failed to get container state: %s", err) // nolint:gocritic
279+
}
280+
281+
fmt.Println(kafkaContainer.ClusterID)
282+
fmt.Println(state.Running)
283+
}
284+
285+
const topic = "example-topic"
286+
287+
// Produce a message from the host that will be read by a consumer in another docker container
288+
{
289+
brokers, err := kafkaContainer.Brokers(ctx)
290+
291+
config := sarama.NewConfig()
292+
config.Producer.Return.Successes = true
293+
producer, err := sarama.NewSyncProducer(brokers, config)
294+
if err != nil {
295+
log.Fatal(err)
296+
}
297+
298+
if _, _, err := producer.SendMessage(&sarama.ProducerMessage{
299+
Topic: topic,
300+
Key: sarama.StringEncoder("key"),
301+
Value: sarama.StringEncoder("example_message_value"),
302+
}); err != nil {
303+
log.Fatal(err)
304+
}
305+
}
306+
307+
brokers, err := kafkaContainer.BrokersByContainerId(ctx)
308+
if err != nil {
309+
log.Fatal(err)
310+
}
311+
312+
// Run another container that can connect to the kafka container via the kafka containers ContainerID
313+
kcat, err := testcontainers.GenericContainer(
314+
ctx,
315+
testcontainers.GenericContainerRequest{
316+
ContainerRequest: testcontainers.ContainerRequest{
317+
Image: "confluentinc/cp-kafkacat",
318+
Entrypoint: []string{"kafkacat"},
319+
Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"},
320+
WaitingFor: wait.ForExit(),
321+
Networks: []string{net.Name}, // Run kafkacat in the same docker network as the testcontainer
322+
},
323+
Started: true,
324+
},
325+
)
326+
if err != nil {
327+
log.Fatalf("kafkacat error: %v", err)
328+
}
329+
330+
lr, err := kcat.Logs(ctx)
331+
if err != nil {
332+
log.Fatalf("kafkacat logs error: %v", err)
333+
}
334+
335+
logs, err := io.ReadAll(lr)
336+
if err != nil {
337+
log.Fatalf("kafkacat logs read error: %v", err)
338+
}
339+
340+
fmt.Println("read message:", string(bytes.TrimSpace(logs)))
341+
342+
// Output:
343+
// test-cluster
344+
// true
345+
// read message: example_message_value
346+
}

0 commit comments

Comments
 (0)