Skip to content

Commit 0cd9c55

Browse files
committed
Fix async producer will may not dispatch the last message before close
1 parent 75d24d4 commit 0cd9c55

File tree

4 files changed

+56
-26
lines changed

4 files changed

+56
-26
lines changed

instrumentation/github.com/Shopify/sarama/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ replace go.opentelemetry.io/contrib => ../../../..
66

77
require (
88
github.com/Shopify/sarama v1.26.4
9-
github.com/google/uuid v1.1.1
109
github.com/stretchr/testify v1.6.1
1110
go.opentelemetry.io/contrib v0.7.0
1211
go.opentelemetry.io/otel v0.8.0

instrumentation/github.com/Shopify/sarama/go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
5050
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
5151
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
5252
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
53-
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
54-
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
5553
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
5654
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
5755
github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=

instrumentation/github.com/Shopify/sarama/producer.go

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919

2020
"github.com/Shopify/sarama"
21-
"github.com/google/uuid"
2221
"google.golang.org/grpc/codes"
2322

2423
"go.opentelemetry.io/otel/api/kv"
@@ -104,16 +103,25 @@ func (p *asyncProducer) Errors() <-chan *sarama.ProducerError {
104103

105104
// AsyncClose async close producer.
106105
func (p *asyncProducer) AsyncClose() {
107-
p.close <- closeAsync
106+
p.input <- &sarama.ProducerMessage{
107+
Metadata: closeAsync,
108+
}
108109
}
109110

110111
// Close shuts down the producer and waits for any buffered messages to be
111112
// flushed.
112113
func (p *asyncProducer) Close() error {
113-
p.close <- closeSync
114+
p.input <- &sarama.ProducerMessage{
115+
Metadata: closeSync,
116+
}
114117
return <-p.closeErr
115118
}
116119

120+
type producerMessageContext struct {
121+
span trace.Span
122+
metadataBackup interface{}
123+
}
124+
117125
// WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages
118126
// are traced. It requires the underlying sarama Config so we can know whether
119127
// or not successes will be returned.
@@ -135,26 +143,42 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
135143
closeErr: make(chan error),
136144
}
137145
go func() {
138-
spans := make(map[interface{}]trace.Span)
146+
producerMessageContexts := make(map[interface{}]producerMessageContext)
139147
defer close(wrapped.successes)
140148
defer close(wrapped.errors)
141149
for {
142150
select {
143-
case t := <-wrapped.close:
144-
switch t {
145-
case closeSync:
146-
go func() {
147-
wrapped.closeErr <- p.Close()
148-
}()
149-
case closeAsync:
150-
p.AsyncClose()
151-
}
152151
case msg := <-wrapped.input:
153-
msg.Metadata = uuid.New()
152+
// Shut down if message metadata is a close type.
153+
// Sarama will close after dispatching every message.
154+
// So wrapper should follow this mechanism by adding a special message at
155+
// the end of the input channel.
156+
if ct, ok := msg.Metadata.(closeType); ok {
157+
switch ct {
158+
case closeSync:
159+
go func() {
160+
wrapped.closeErr <- p.Close()
161+
}()
162+
case closeAsync:
163+
p.AsyncClose()
164+
}
165+
continue
166+
}
167+
154168
span := startProducerSpan(cfg, saramaConfig.Version, msg)
169+
170+
// Create message context, backend message metadata
171+
mc := producerMessageContext{
172+
metadataBackup: msg.Metadata,
173+
span: span,
174+
}
175+
176+
// Specific metadata with span id
177+
msg.Metadata = span.SpanContext().SpanID
178+
155179
p.Input() <- msg
156180
if saramaConfig.Producer.Return.Successes {
157-
spans[msg.Metadata] = span
181+
producerMessageContexts[msg.Metadata] = mc
158182
} else {
159183
// If returning successes isn't enabled, we just finish the
160184
// span right away because there's no way to know when it will
@@ -167,9 +191,12 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
167191
return
168192
}
169193
key := msg.Metadata
170-
if span, ok := spans[key]; ok {
171-
delete(spans, key)
172-
finishProducerSpan(span, msg.Partition, msg.Offset, nil)
194+
if mc, ok := producerMessageContexts[key]; ok {
195+
delete(producerMessageContexts, key)
196+
finishProducerSpan(mc.span, msg.Partition, msg.Offset, nil)
197+
198+
// Restore message metadata
199+
msg.Metadata = mc.metadataBackup
173200
}
174201
wrapped.successes <- msg
175202
case err, ok := <-p.Errors():
@@ -178,9 +205,9 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
178205
return
179206
}
180207
key := err.Msg.Metadata
181-
if span, ok := spans[key]; ok {
182-
delete(spans, key)
183-
finishProducerSpan(span, err.Msg.Partition, err.Msg.Offset, err.Err)
208+
if mc, ok := producerMessageContexts[key]; ok {
209+
delete(producerMessageContexts, key)
210+
finishProducerSpan(mc.span, err.Msg.Partition, err.Msg.Offset, err.Err)
184211
}
185212
wrapped.errors <- err
186213
}

instrumentation/github.com/Shopify/sarama/producer_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,13 @@ func TestWrapAsyncProducer(t *testing.T) {
235235

236236
msgList := createMessages(mt)
237237
// Send message
238-
for _, msg := range msgList {
238+
for i, msg := range msgList {
239239
mockAsyncProducer.ExpectInputAndSucceed()
240+
// Add metadata to msg
241+
msg.Metadata = i
240242
ap.Input() <- msg
241-
<-ap.Successes()
243+
newMsg := <-ap.Successes()
244+
assert.Equal(t, newMsg, msg)
242245
}
243246

244247
err := ap.Close()
@@ -289,6 +292,9 @@ func TestWrapAsyncProducer(t *testing.T) {
289292
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key)
290293
}
291294

295+
// Check metadata
296+
assert.Equal(t, i, msg.Metadata)
297+
292298
// Check tracing propagation
293299
remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewProducerMessageCarrier(msg)))
294300
assert.True(t, remoteSpanFromMessage.IsValid())

0 commit comments

Comments
 (0)