Skip to content

Commit 4fd08b6

Browse files
committed
Fix async producer will may not dispatch the last message before close
1 parent 75d24d4 commit 4fd08b6

File tree

4 files changed

+56
-28
lines changed

4 files changed

+56
-28
lines changed

instrumentation/github.com/Shopify/sarama/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ replace go.opentelemetry.io/contrib => ../../../..
66

77
require (
88
github.com/Shopify/sarama v1.26.4
9-
github.com/google/uuid v1.1.1
109
github.com/stretchr/testify v1.6.1
1110
go.opentelemetry.io/contrib v0.7.0
1211
go.opentelemetry.io/otel v0.8.0

instrumentation/github.com/Shopify/sarama/go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
5050
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
5151
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
5252
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
53-
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
54-
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
5553
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
5654
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
5755
github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=

instrumentation/github.com/Shopify/sarama/producer.go

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919

2020
"github.com/Shopify/sarama"
21-
"github.com/google/uuid"
2221
"google.golang.org/grpc/codes"
2322

2423
"go.opentelemetry.io/otel/api/kv"
@@ -83,7 +82,6 @@ type asyncProducer struct {
8382
input chan *sarama.ProducerMessage
8483
successes chan *sarama.ProducerMessage
8584
errors chan *sarama.ProducerError
86-
close chan closeType
8785
closeErr chan error
8886
}
8987

@@ -104,16 +102,25 @@ func (p *asyncProducer) Errors() <-chan *sarama.ProducerError {
104102

105103
// AsyncClose async close producer.
106104
func (p *asyncProducer) AsyncClose() {
107-
p.close <- closeAsync
105+
p.input <- &sarama.ProducerMessage{
106+
Metadata: closeAsync,
107+
}
108108
}
109109

110110
// Close shuts down the producer and waits for any buffered messages to be
111111
// flushed.
112112
func (p *asyncProducer) Close() error {
113-
p.close <- closeSync
113+
p.input <- &sarama.ProducerMessage{
114+
Metadata: closeSync,
115+
}
114116
return <-p.closeErr
115117
}
116118

119+
type producerMessageContext struct {
120+
span trace.Span
121+
metadataBackup interface{}
122+
}
123+
117124
// WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages
118125
// are traced. It requires the underlying sarama Config so we can know whether
119126
// or not successes will be returned.
@@ -131,30 +138,45 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
131138
input: make(chan *sarama.ProducerMessage),
132139
successes: make(chan *sarama.ProducerMessage),
133140
errors: make(chan *sarama.ProducerError),
134-
close: make(chan closeType),
135141
closeErr: make(chan error),
136142
}
137143
go func() {
138-
spans := make(map[interface{}]trace.Span)
144+
producerMessageContexts := make(map[interface{}]producerMessageContext)
139145
defer close(wrapped.successes)
140146
defer close(wrapped.errors)
141147
for {
142148
select {
143-
case t := <-wrapped.close:
144-
switch t {
145-
case closeSync:
146-
go func() {
147-
wrapped.closeErr <- p.Close()
148-
}()
149-
case closeAsync:
150-
p.AsyncClose()
151-
}
152149
case msg := <-wrapped.input:
153-
msg.Metadata = uuid.New()
150+
// Shut down if message metadata is a close type.
151+
// Sarama will close after dispatching every message.
152+
// So wrapper should follow this mechanism by adding a special message at
153+
// the end of the input channel.
154+
if ct, ok := msg.Metadata.(closeType); ok {
155+
switch ct {
156+
case closeSync:
157+
go func() {
158+
wrapped.closeErr <- p.Close()
159+
}()
160+
case closeAsync:
161+
p.AsyncClose()
162+
}
163+
continue
164+
}
165+
154166
span := startProducerSpan(cfg, saramaConfig.Version, msg)
167+
168+
// Create message context, backend message metadata
169+
mc := producerMessageContext{
170+
metadataBackup: msg.Metadata,
171+
span: span,
172+
}
173+
174+
// Specific metadata with span id
175+
msg.Metadata = span.SpanContext().SpanID
176+
155177
p.Input() <- msg
156178
if saramaConfig.Producer.Return.Successes {
157-
spans[msg.Metadata] = span
179+
producerMessageContexts[msg.Metadata] = mc
158180
} else {
159181
// If returning successes isn't enabled, we just finish the
160182
// span right away because there's no way to know when it will
@@ -167,9 +189,12 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
167189
return
168190
}
169191
key := msg.Metadata
170-
if span, ok := spans[key]; ok {
171-
delete(spans, key)
172-
finishProducerSpan(span, msg.Partition, msg.Offset, nil)
192+
if mc, ok := producerMessageContexts[key]; ok {
193+
delete(producerMessageContexts, key)
194+
finishProducerSpan(mc.span, msg.Partition, msg.Offset, nil)
195+
196+
// Restore message metadata
197+
msg.Metadata = mc.metadataBackup
173198
}
174199
wrapped.successes <- msg
175200
case err, ok := <-p.Errors():
@@ -178,9 +203,9 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
178203
return
179204
}
180205
key := err.Msg.Metadata
181-
if span, ok := spans[key]; ok {
182-
delete(spans, key)
183-
finishProducerSpan(span, err.Msg.Partition, err.Msg.Offset, err.Err)
206+
if mc, ok := producerMessageContexts[key]; ok {
207+
delete(producerMessageContexts, key)
208+
finishProducerSpan(mc.span, err.Msg.Partition, err.Msg.Offset, err.Err)
184209
}
185210
wrapped.errors <- err
186211
}

instrumentation/github.com/Shopify/sarama/producer_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,13 @@ func TestWrapAsyncProducer(t *testing.T) {
235235

236236
msgList := createMessages(mt)
237237
// Send message
238-
for _, msg := range msgList {
238+
for i, msg := range msgList {
239239
mockAsyncProducer.ExpectInputAndSucceed()
240+
// Add metadata to msg
241+
msg.Metadata = i
240242
ap.Input() <- msg
241-
<-ap.Successes()
243+
newMsg := <-ap.Successes()
244+
assert.Equal(t, newMsg, msg)
242245
}
243246

244247
err := ap.Close()
@@ -289,6 +292,9 @@ func TestWrapAsyncProducer(t *testing.T) {
289292
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key)
290293
}
291294

295+
// Check metadata
296+
assert.Equal(t, i, msg.Metadata)
297+
292298
// Check tracing propagation
293299
remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewProducerMessageCarrier(msg)))
294300
assert.True(t, remoteSpanFromMessage.IsValid())

0 commit comments

Comments
 (0)