-
Notifications
You must be signed in to change notification settings - Fork 673
Add instrumentation for Kafka #134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
lizthegrey
merged 20 commits into
open-telemetry:master
from
XSAM:feature/kafka-instrumentation
Jul 23, 2020
Merged
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
e108e29
Add instrumentation for Kafka sarama library
XSAM 77c933e
Add links and status message for mock span
XSAM 5eaaf17
Add test for sarama consumer
XSAM 3adea50
Add test for sarama message and option
XSAM 312d6c7
Add test for sarama producer
XSAM 06462e5
Update changelog
XSAM 76845d6
Merge branch 'master' into feature/kafka-instrumentation
lizthegrey 7ed4ac5
Fix producer fail to inject tracing info into message header
XSAM 6cd294b
Fix async producer cannot return successes once been closed
XSAM 8e48209
Fix consumer cannot end the span of the last message
XSAM 63e7c1e
Create consumer span with remote parent span
XSAM 75d24d4
Add benchmarks for sarama instrumentation
XSAM 253bc31
Fix async producer may not dispatch the last message before close
XSAM b120108
Fix async producer may not close some spans while closing
XSAM 7c727b9
Fix wrong doc description about span propagation
XSAM b87bc93
Fix wrong `messaging.message_id` value type
XSAM 3929fbf
Replace some `assert` with `require`
XSAM f8e5d25
Update styles and the version of OTel
XSAM 6428e3b
Update dependabot to include instrumentation of Sarama
XSAM 7c24dde
Merge remote-tracking branch 'upstream/master' into feature/kafka-ins…
XSAM File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
122 changes: 122 additions & 0 deletions
122
instrumentation/github.com/Shopify/sarama/consumer.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package sarama | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/Shopify/sarama" | ||
|
||
"go.opentelemetry.io/otel/api/kv" | ||
"go.opentelemetry.io/otel/api/propagation" | ||
"go.opentelemetry.io/otel/api/standard" | ||
"go.opentelemetry.io/otel/api/trace" | ||
) | ||
|
||
type partitionConsumer struct { | ||
sarama.PartitionConsumer | ||
messages chan *sarama.ConsumerMessage | ||
} | ||
|
||
// Messages returns the read channel for the messages that are returned by | ||
// the broker. | ||
func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage { | ||
return pc.messages | ||
} | ||
|
||
// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received | ||
// message to be traced. | ||
func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer { | ||
cfg := newConfig(serviceName, opts...) | ||
|
||
wrapped := &partitionConsumer{ | ||
PartitionConsumer: pc, | ||
messages: make(chan *sarama.ConsumerMessage), | ||
} | ||
go func() { | ||
msgs := pc.Messages() | ||
|
||
var prevSpan trace.Span | ||
for msg := range msgs { | ||
// Extract a span context from message to link. | ||
carrier := NewConsumerMessageCarrier(msg) | ||
parentSpanContext := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier)) | ||
|
||
// Create a span. | ||
attrs := []kv.KeyValue{ | ||
standard.ServiceNameKey.String(cfg.ServiceName), | ||
standard.MessagingSystemKey.String("kafka"), | ||
standard.MessagingDestinationKindKeyTopic, | ||
standard.MessagingDestinationKey.String(msg.Topic), | ||
standard.MessagingOperationReceive, | ||
standard.MessagingMessageIDKey.Int64(msg.Offset), | ||
XSAM marked this conversation as resolved.
Show resolved
Hide resolved
|
||
kafkaPartitionKey.Int32(msg.Partition), | ||
} | ||
opts := []trace.StartOption{ | ||
trace.WithAttributes(attrs...), | ||
trace.WithSpanKind(trace.SpanKindConsumer), | ||
} | ||
if parentSpanContext.IsValid() { | ||
opts = append(opts, trace.LinkedTo(parentSpanContext)) | ||
} | ||
newCtx, span := cfg.Tracer.Start(context.Background(), "kafka.consume", opts...) | ||
|
||
// Inject current span context, so consumers can use it to propagate span. | ||
propagation.InjectHTTP(newCtx, cfg.Propagators, carrier) | ||
|
||
// Send messages back to user. | ||
wrapped.messages <- msg | ||
|
||
// Finish the previous span. | ||
if prevSpan != nil { | ||
prevSpan.End() | ||
} | ||
prevSpan = span | ||
} | ||
// Finish any remaining span. | ||
if prevSpan != nil { | ||
prevSpan.End() | ||
} | ||
close(wrapped.messages) | ||
}() | ||
return wrapped | ||
} | ||
|
||
type consumer struct { | ||
sarama.Consumer | ||
|
||
serviceName string | ||
opts []Option | ||
} | ||
|
||
// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting | ||
// PartitionConsumer. | ||
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { | ||
pc, err := c.Consumer.ConsumePartition(topic, partition, offset) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return WrapPartitionConsumer(c.serviceName, pc, c.opts...), nil | ||
} | ||
|
||
// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created | ||
// via Consumer.ConsumePartition. | ||
func WrapConsumer(serviceName string, c sarama.Consumer, opts ...Option) sarama.Consumer { | ||
return &consumer{ | ||
Consumer: c, | ||
serviceName: serviceName, | ||
opts: opts, | ||
} | ||
} |
170 changes: 170 additions & 0 deletions
170
instrumentation/github.com/Shopify/sarama/consumer_test.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package sarama | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/Shopify/sarama" | ||
"github.com/Shopify/sarama/mocks" | ||
"github.com/stretchr/testify/assert" | ||
|
||
"go.opentelemetry.io/otel/api/global" | ||
"go.opentelemetry.io/otel/api/kv" | ||
"go.opentelemetry.io/otel/api/propagation" | ||
"go.opentelemetry.io/otel/api/standard" | ||
"go.opentelemetry.io/otel/api/trace" | ||
|
||
mocktracer "go.opentelemetry.io/contrib/internal/trace" | ||
) | ||
|
||
const ( | ||
serviceName = "test-service-name" | ||
topic = "test-topic" | ||
) | ||
|
||
var ( | ||
propagators = global.Propagators() | ||
) | ||
|
||
func TestWrapPartitionConsumer(t *testing.T) { | ||
// Mock tracer | ||
mt := mocktracer.NewTracer("kafka") | ||
|
||
// Mock partition consumer controller | ||
consumer := mocks.NewConsumer(t, sarama.NewConfig()) | ||
mockPartitionConsumer := consumer.ExpectConsumePartition(topic, 0, 0) | ||
|
||
// Create partition consumer | ||
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0) | ||
assert.NoError(t, err) | ||
XSAM marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt)) | ||
|
||
consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer) | ||
} | ||
|
||
func TestWrapConsumer(t *testing.T) { | ||
// Mock tracer | ||
mt := mocktracer.NewTracer("kafka") | ||
|
||
// Mock partition consumer controller | ||
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig()) | ||
mockPartitionConsumer := mockConsumer.ExpectConsumePartition(topic, 0, 0) | ||
|
||
// Wrap consumer | ||
consumer := WrapConsumer(serviceName, mockConsumer, WithTracer(mt)) | ||
|
||
// Create partition consumer | ||
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0) | ||
assert.NoError(t, err) | ||
XSAM marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer) | ||
} | ||
|
||
func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer *mocks.PartitionConsumer, partitionConsumer sarama.PartitionConsumer) { | ||
// Create message with span context | ||
ctx, _ := mt.Start(context.Background(), "") | ||
message := sarama.ConsumerMessage{Key: []byte("foo")} | ||
propagation.InjectHTTP(ctx, propagators, NewConsumerMessageCarrier(&message)) | ||
|
||
// Produce message | ||
mockPartitionConsumer.YieldMessage(&message) | ||
mockPartitionConsumer.YieldMessage(&sarama.ConsumerMessage{Key: []byte("foo2")}) | ||
|
||
// Consume messages | ||
msgList := make([]*sarama.ConsumerMessage, 2) | ||
msgList[0] = <-partitionConsumer.Messages() | ||
msgList[1] = <-partitionConsumer.Messages() | ||
assert.NoError(t, partitionConsumer.Close()) | ||
// Wait for the channel to be closed | ||
<-partitionConsumer.Messages() | ||
|
||
// Check spans length | ||
spans := mt.EndedSpans() | ||
assert.Len(t, spans, 2) | ||
|
||
expectedList := []struct { | ||
kvList []kv.KeyValue | ||
links map[trace.SpanContext][]kv.KeyValue | ||
kind trace.SpanKind | ||
msgKey []byte | ||
}{ | ||
{ | ||
kvList: []kv.KeyValue{ | ||
standard.ServiceNameKey.String(serviceName), | ||
standard.MessagingSystemKey.String("kafka"), | ||
standard.MessagingDestinationKindKeyTopic, | ||
standard.MessagingDestinationKey.String("test-topic"), | ||
standard.MessagingOperationReceive, | ||
standard.MessagingMessageIDKey.Int64(1), | ||
kafkaPartitionKey.Int32(0), | ||
}, | ||
links: map[trace.SpanContext][]kv.KeyValue{ | ||
trace.SpanFromContext(ctx).SpanContext(): nil, | ||
}, | ||
kind: trace.SpanKindConsumer, | ||
msgKey: []byte("foo"), | ||
}, | ||
{ | ||
kvList: []kv.KeyValue{ | ||
standard.ServiceNameKey.String(serviceName), | ||
standard.MessagingSystemKey.String("kafka"), | ||
standard.MessagingDestinationKindKeyTopic, | ||
standard.MessagingDestinationKey.String("test-topic"), | ||
standard.MessagingOperationReceive, | ||
standard.MessagingMessageIDKey.Int64(2), | ||
kafkaPartitionKey.Int32(0), | ||
}, | ||
links: make(map[trace.SpanContext][]kv.KeyValue), | ||
kind: trace.SpanKindConsumer, | ||
msgKey: []byte("foo2"), | ||
}, | ||
} | ||
|
||
for i, expected := range expectedList { | ||
t.Run(fmt.Sprint("index", i), func(t *testing.T) { | ||
span := spans[i] | ||
|
||
assert.Equal(t, expected.links, span.Links) | ||
|
||
remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewConsumerMessageCarrier(msgList[i]))) | ||
assert.Equal(t, span.SpanContext(), remoteSpanFromMessage, | ||
"span context should be injected into the consumer message headers") | ||
|
||
assert.Equal(t, "kafka.consume", span.Name) | ||
assert.Equal(t, expected.kind, span.Kind) | ||
assert.Equal(t, expected.msgKey, msgList[i].Key) | ||
for _, k := range expected.kvList { | ||
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestConsumer_ConsumePartitionWithError(t *testing.T) { | ||
lizthegrey marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Mock partition consumer controller | ||
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig()) | ||
mockConsumer.ExpectConsumePartition(topic, 0, 0) | ||
|
||
consumer := WrapConsumer(serviceName, mockConsumer) | ||
_, err := consumer.ConsumePartition(topic, 0, 0) | ||
assert.NoError(t, err) | ||
// Consume twice | ||
_, err = consumer.ConsumePartition(topic, 0, 0) | ||
assert.Error(t, err) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// Package sarama provides functions to trace the Shopify/sarama package. (https://github.com/Shopify/sarama) | ||
// | ||
// The consumer's span will not be created as a child of the producer's span; instead, it will link the producer's span. | ||
// (https://github.com/open-telemetry/opentelemetry-specification/blob/v0.6.0/specification/trace/semantic_conventions/messaging.md#batch-receiving) | ||
// | ||
// Context propagation only works on Kafka versions higher than 0.11.0.0 which supports record headers. | ||
// (https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html) | ||
// | ||
// Based on: https://github.com/DataDog/dd-trace-go/tree/v1/contrib/Shopify/sarama | ||
package sarama // import "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
module go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama | ||
|
||
go 1.14 | ||
|
||
replace go.opentelemetry.io/contrib => ../../../.. | ||
|
||
require ( | ||
github.com/Shopify/sarama v1.26.4 | ||
github.com/google/uuid v1.1.1 | ||
github.com/stretchr/testify v1.6.1 | ||
go.opentelemetry.io/contrib v0.7.0 | ||
XSAM marked this conversation as resolved.
Show resolved
Hide resolved
|
||
go.opentelemetry.io/otel v0.8.0 | ||
google.golang.org/grpc v1.30.0 | ||
) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.