Skip to content

Commit 3f3e30c

Browse files
committed
Add example for Sarama consumer and producer
1 parent 0933666 commit 3f3e30c

File tree

7 files changed

+415
-0
lines changed

7 files changed

+415
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Kafka Sarama instrumentation example
2+
3+
A Kafka producer and consumer using Sarama with instrumentation.
4+
5+
These instructions expect you have
6+
[docker-compose](https://docs.docker.com/compose/) installed.
7+
8+
Bring up the `Kafka` and `ZooKeeper` services to run the
9+
example:
10+
11+
```sh
12+
docker-compose up -d zoo kafka
13+
```
14+
15+
Then up the `kafka-producer` service to produce a message into Kafka:
16+
17+
```sh
18+
docker-compose up kafka-producer
19+
```
20+
21+
At last, up the `kafka-consumer` service to consume messages from Kafka:
22+
23+
```sh
24+
docker-compose up kafka-consumer
25+
```
26+
27+
Shut down the services when you are finished with the example:
28+
29+
```sh
30+
docker-compose down
31+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
FROM golang:alpine AS base
15+
COPY . /src/
16+
WORKDIR /src/instrumentation/github.com/Shopify/sarama
17+
18+
FROM base AS kafka-consumer
19+
RUN go install ./example/consumer/consumer.go
20+
CMD ["/go/bin/consumer"]
21+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"context"
19+
"flag"
20+
"log"
21+
"os"
22+
"strings"
23+
"time"
24+
25+
"github.com/Shopify/sarama"
26+
27+
"go.opentelemetry.io/otel/api/global"
28+
"go.opentelemetry.io/otel/api/propagation"
29+
"go.opentelemetry.io/otel/api/standard"
30+
"go.opentelemetry.io/otel/api/trace"
31+
32+
saramatrace "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama"
33+
"go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/example"
34+
)
35+
36+
var (
37+
brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
38+
)
39+
40+
func main() {
41+
example.InitTracer()
42+
flag.Parse()
43+
44+
if *brokers == "" {
45+
flag.PrintDefaults()
46+
os.Exit(1)
47+
}
48+
49+
brokerList := strings.Split(*brokers, ",")
50+
log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
51+
52+
startConsumerGroup(brokerList)
53+
54+
select {}
55+
}
56+
57+
func startConsumerGroup(brokerList []string) {
58+
consumerGroupHandler := Consumer{}
59+
// Wrap instrumentation
60+
handler := saramatrace.WrapConsumerGroupHandler("example-consumer", &consumerGroupHandler)
61+
62+
config := sarama.NewConfig()
63+
config.Version = sarama.V2_5_0_0
64+
config.Consumer.Offsets.Initial = sarama.OffsetOldest
65+
66+
// Create consumer group
67+
consumerGroup, err := sarama.NewConsumerGroup(brokerList, "example", config)
68+
if err != nil {
69+
log.Fatalln("Failed to start sarama consumer group:", err)
70+
}
71+
72+
err = consumerGroup.Consume(context.Background(), []string{example.KafkaTopic}, handler)
73+
if err != nil {
74+
log.Fatalln("Failed to consume via handler:", err)
75+
}
76+
}
77+
78+
func printMessage(msg *sarama.ConsumerMessage) {
79+
// Extract tracing info from message
80+
ctx := propagation.ExtractHTTP(context.Background(), global.Propagators(), saramatrace.NewConsumerMessageCarrier(msg))
81+
82+
tr := global.Tracer("consumer")
83+
_, span := tr.Start(ctx, "consume message", trace.WithAttributes(
84+
standard.MessagingOperationProcess,
85+
))
86+
defer span.End()
87+
88+
// Emulate Work loads
89+
time.Sleep(1 * time.Second)
90+
91+
log.Println("Successful to read message: ", string(msg.Value))
92+
}
93+
94+
// Consumer represents a Sarama consumer group consumer
95+
type Consumer struct {
96+
}
97+
98+
// Setup is run at the beginning of a new session, before ConsumeClaim
99+
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
100+
return nil
101+
}
102+
103+
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
104+
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
105+
return nil
106+
}
107+
108+
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
109+
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
110+
// NOTE:
111+
// Do not move the code below to a goroutine.
112+
// The `ConsumeClaim` itself is called within a goroutine, see:
113+
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
114+
for message := range claim.Messages() {
115+
printMessage(message)
116+
session.MarkMessage(message, "")
117+
}
118+
119+
return nil
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
version: "3.7"
15+
services:
16+
zoo:
17+
image: zookeeper:3.4.9
18+
hostname: zoo
19+
ports:
20+
- "2181:2181"
21+
environment:
22+
ZOO_MY_ID: 1
23+
ZOO_PORT: 2181
24+
ZOO_SERVERS: server.1=zoo:2888:3888
25+
networks:
26+
- example
27+
kafka:
28+
# Kafka version 2.5.0
29+
image: confluentinc/cp-kafka:5.5.0
30+
hostname: kafka
31+
ports:
32+
- "9092:9092"
33+
environment:
34+
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
35+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
36+
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
37+
KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
38+
KAFKA_BROKER_ID: 1
39+
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
40+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
41+
depends_on:
42+
- zoo
43+
networks:
44+
- example
45+
46+
kafka-producer:
47+
build:
48+
dockerfile: $PWD/producer/Dockerfile
49+
context: ../../../../..
50+
command:
51+
- "/bin/sh"
52+
- "-c"
53+
- "/go/bin/producer"
54+
environment:
55+
KAFKA_PEERS: kafka:19092
56+
depends_on:
57+
- kafka
58+
networks:
59+
- example
60+
kafka-consumer:
61+
build:
62+
dockerfile: $PWD/consumer/Dockerfile
63+
context: ../../../../..
64+
command:
65+
- "/bin/sh"
66+
- "-c"
67+
- "/go/bin/consumer"
68+
environment:
69+
KAFKA_PEERS: kafka:19092
70+
depends_on:
71+
- kafka
72+
networks:
73+
- example
74+
networks:
75+
example:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package example
16+
17+
import (
18+
"log"
19+
20+
otelglobal "go.opentelemetry.io/otel/api/global"
21+
oteltracestdout "go.opentelemetry.io/otel/exporters/trace/stdout"
22+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
23+
)
24+
25+
const (
26+
KafkaTopic = "sarama-instrumentation-example"
27+
)
28+
29+
func InitTracer() {
30+
exporter, err := oteltracestdout.NewExporter(oteltracestdout.Options{PrettyPrint: true})
31+
if err != nil {
32+
log.Fatal(err)
33+
}
34+
cfg := sdktrace.Config{
35+
DefaultSampler: sdktrace.AlwaysSample(),
36+
}
37+
tp, err := sdktrace.NewProvider(
38+
sdktrace.WithConfig(cfg),
39+
sdktrace.WithSyncer(exporter),
40+
)
41+
if err != nil {
42+
log.Fatal(err)
43+
}
44+
otelglobal.SetTraceProvider(tp)
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
FROM golang:alpine AS base
15+
COPY . /src/
16+
WORKDIR /src/instrumentation/github.com/Shopify/sarama
17+
18+
FROM base AS kafka-producer
19+
RUN go install ./example/producer/producer.go
20+
CMD ["/go/bin/producer"]

0 commit comments

Comments
 (0)