Skip to content

Remove service name as a parameter of Sarama instrumentation #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Instrumentation for the stdlib `net/http` and `net/http/httptrace` packages. (#190)
- Initial Cortex exporter. (#202, #205, #210, #211, #215)

### Changed

- Remove service name as a parameter of Sarama instrumentation. (#221)
- Replace `WithTracer` with `WithTracerProvider` in Sarama instrumentation. (#221)

### Fixed

- Bump google.golang.org/grpc from 1.30.0 to 1.31.0. (#166)
Expand Down
16 changes: 7 additions & 9 deletions instrumentation/github.com/Shopify/sarama/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {

// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received
// message to be traced.
func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := newConfig(serviceName, opts...)
func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := newConfig(opts...)

dispatcher := newConsumerMessagesDispatcherWrapper(pc, cfg)
go dispatcher.Run()
Expand All @@ -46,8 +46,7 @@ func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts
type consumer struct {
sarama.Consumer

serviceName string
opts []Option
opts []Option
}

// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting
Expand All @@ -57,15 +56,14 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
if err != nil {
return nil, err
}
return WrapPartitionConsumer(c.serviceName, pc, c.opts...), nil
return WrapPartitionConsumer(pc, c.opts...), nil
}

// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created
// via Consumer.ConsumePartition.
func WrapConsumer(serviceName string, c sarama.Consumer, opts ...Option) sarama.Consumer {
func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer {
return &consumer{
Consumer: c,
serviceName: serviceName,
opts: opts,
Consumer: c,
opts: opts,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,

// WrapConsumerGroupHandler wraps a sarama.ConsumerGroupHandler causing each received
// message to be traced.
func WrapConsumerGroupHandler(serviceName string, handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler {
cfg := newConfig(serviceName, opts...)
func WrapConsumerGroupHandler(handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler {
cfg := newConfig(opts...)

return &consumerGroupHandler{
ConsumerGroupHandler: handler,
Expand Down
25 changes: 11 additions & 14 deletions instrumentation/github.com/Shopify/sarama/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,16 @@ import (
)

const (
serviceName = "test-service-name"
topic = "test-topic"
topic = "test-topic"
)

var (
propagators = global.Propagators()
)

func TestWrapPartitionConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
// Mock provider
provider, mt := newProviderAndTracer()

// Mock partition consumer controller
consumer := mocks.NewConsumer(t, sarama.NewConfig())
Expand All @@ -54,21 +53,21 @@ func TestWrapPartitionConsumer(t *testing.T) {
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)

partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))
partitionConsumer = WrapPartitionConsumer(partitionConsumer, WithTraceProvider(provider))

consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer)
}

func TestWrapConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
// Mock provider
provider, mt := newProviderAndTracer()

// Mock partition consumer controller
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockPartitionConsumer := mockConsumer.ExpectConsumePartition(topic, 0, 0)

// Wrap consumer
consumer := WrapConsumer(serviceName, mockConsumer, WithTracer(mt))
consumer := WrapConsumer(mockConsumer, WithTraceProvider(provider))

// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
Expand Down Expand Up @@ -107,7 +106,6 @@ func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer
}{
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
Expand All @@ -121,7 +119,6 @@ func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
Expand Down Expand Up @@ -159,7 +156,7 @@ func TestConsumerConsumePartitionWithError(t *testing.T) {
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockConsumer.ExpectConsumePartition(topic, 0, 0)

consumer := WrapConsumer(serviceName, mockConsumer)
consumer := WrapConsumer(mockConsumer)
_, err := consumer.ConsumePartition(topic, 0, 0)
assert.NoError(t, err)
// Consume twice
Expand All @@ -168,12 +165,12 @@ func TestConsumerConsumePartitionWithError(t *testing.T) {
}

func BenchmarkWrapPartitionConsumer(b *testing.B) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
// Mock provider
provider, _ := newProviderAndTracer()

mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b)

partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))
partitionConsumer = WrapPartitionConsumer(partitionConsumer, WithTraceProvider(provider))
message := sarama.ConsumerMessage{Key: []byte("foo")}

b.ReportAllocs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func (w *consumerMessagesDispatcherWrapper) Run() {

// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(w.cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
func startConsumerGroup(brokerList []string) {
consumerGroupHandler := Consumer{}
// Wrap instrumentation
handler := saramatrace.WrapConsumerGroupHandler("example-consumer", &consumerGroupHandler)
handler := saramatrace.WrapConsumerGroupHandler(&consumerGroupHandler)

config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
}

// Wrap instrumentation
producer = saramatrace.WrapAsyncProducer("example-producer", config, producer)
producer = saramatrace.WrapAsyncProducer(config, producer)

// We will log to STDOUT if we're not able to produce messages.
go func() {
Expand Down
28 changes: 16 additions & 12 deletions instrumentation/github.com/Shopify/sarama/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,37 @@ const (
)

type config struct {
ServiceName string
Tracer trace.Tracer
Propagators otelpropagation.Propagators
TraceProvider trace.Provider
Propagators otelpropagation.Propagators

Tracer trace.Tracer
}

// newConfig returns a config with all Options set.
func newConfig(serviceName string, opts ...Option) config {
cfg := config{Propagators: global.Propagators(), ServiceName: serviceName}
func newConfig(opts ...Option) config {
cfg := config{
Propagators: global.Propagators(),
TraceProvider: global.TraceProvider(),
}
for _, opt := range opts {
opt(&cfg)
}
if cfg.Tracer == nil {
cfg.Tracer = global.Tracer(defaultTracerName)
}

cfg.Tracer = cfg.TraceProvider.Tracer(defaultTracerName)

return cfg
}

// Option specifies instrumentation configuration options.
type Option func(*config)

// WithTracer specifies a tracer to use for creating spans. If none is
// specified, a tracer named
// WithTraceProvider specifies a trace provider to use for creating a tracer for spans.
// If none is specified, a tracer named
// "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama"
// from the global provider is used.
func WithTracer(tracer trace.Tracer) Option {
func WithTraceProvider(provider trace.Provider) Option {
return func(cfg *config) {
cfg.Tracer = tracer
cfg.TraceProvider = provider
}
}

Expand Down
38 changes: 13 additions & 25 deletions instrumentation/github.com/Shopify/sarama/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,49 +24,37 @@ import (

func TestNewConfig(t *testing.T) {
testCases := []struct {
name string
serviceName string
opts []Option
expected config
name string
opts []Option
expected config
}{
{
name: "set service name",
serviceName: serviceName,
expected: config{
ServiceName: serviceName,
Tracer: global.Tracer(defaultTracerName),
Propagators: global.Propagators(),
},
},
{
name: "with tracer",
serviceName: serviceName,
name: "with provider",
opts: []Option{
WithTracer(global.Tracer("new")),
WithTraceProvider(global.TraceProvider()),
},
expected: config{
ServiceName: serviceName,
Tracer: global.Tracer("new"),
Propagators: global.Propagators(),
TraceProvider: global.TraceProvider(),
Tracer: global.TraceProvider().Tracer(defaultTracerName),
Propagators: global.Propagators(),
},
},
{
name: "with propagators",
serviceName: serviceName,
name: "with propagators",
opts: []Option{
WithPropagators(nil),
},
expected: config{
ServiceName: serviceName,
Tracer: global.Tracer(defaultTracerName),
Propagators: nil,
TraceProvider: global.TraceProvider(),
Tracer: global.TraceProvider().Tracer(defaultTracerName),
Propagators: nil,
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := newConfig(tc.serviceName, tc.opts...)
result := newConfig(tc.opts...)
assert.Equal(t, tc.expected, result)
})
}
Expand Down
9 changes: 4 additions & 5 deletions instrumentation/github.com/Shopify/sarama/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {

// WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages
// are traced.
func WrapSyncProducer(serviceName string, saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer {
cfg := newConfig(serviceName, opts...)
func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer {
cfg := newConfig(opts...)
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
}
Expand Down Expand Up @@ -131,8 +131,8 @@ type producerMessageContext struct {
//
// If `Return.Successes` is false, there is no way to know partition and offset of
// the message.
func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer {
cfg := newConfig(serviceName, opts...)
func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer {
cfg := newConfig(opts...)
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
}
Expand Down Expand Up @@ -234,7 +234,6 @@ func startProducerSpan(cfg config, version sarama.KafkaVersion, msg *sarama.Prod

// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
Expand Down
Loading