Skip to content

Commit 7ebd7b5

Browse files
vmihailencojmacd
andauthored
Fix BatchSpanProcessor.Shutdown to wait until all spans are processed (#766)
* Fix BatchSpanProcessor.Shutdown to wait until all spans are processed Currently it exits too soon - before drainQueue is finished * Check bsp.stopCh to reliably drop span when batcher is stopped * Enable tests * Always use WithBlocking Co-authored-by: Joshua MacDonald <[email protected]>
1 parent c58680a commit 7ebd7b5

File tree

2 files changed

+38
-23
lines changed

2 files changed

+38
-23
lines changed

sdk/trace/batch_span_processor.go

+23-14
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,10 @@ func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOptio
105105
queue: make(chan *export.SpanData, o.MaxQueueSize),
106106
stopCh: make(chan struct{}),
107107
}
108-
bsp.stopWait.Add(1)
109108

109+
bsp.stopWait.Add(1)
110110
go func() {
111+
defer bsp.stopWait.Done()
111112
bsp.processQueue()
112113
bsp.drainQueue()
113114
}()
@@ -130,8 +131,6 @@ func (bsp *BatchSpanProcessor) Shutdown() {
130131
bsp.stopOnce.Do(func() {
131132
close(bsp.stopCh)
132133
bsp.stopWait.Wait()
133-
close(bsp.queue)
134-
135134
})
136135
}
137136

@@ -173,7 +172,6 @@ func (bsp *BatchSpanProcessor) exportSpans() {
173172
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
174173
// waiting up to BatchTimeout to form a batch.
175174
func (bsp *BatchSpanProcessor) processQueue() {
176-
defer bsp.stopWait.Done()
177175
defer bsp.timer.Stop()
178176

179177
for {
@@ -197,13 +195,22 @@ func (bsp *BatchSpanProcessor) processQueue() {
197195
// drainQueue awaits the any caller that had added to bsp.stopWait
198196
// to finish the enqueue, then exports the final batch.
199197
func (bsp *BatchSpanProcessor) drainQueue() {
200-
for sd := range bsp.queue {
201-
bsp.batch = append(bsp.batch, sd)
202-
if len(bsp.batch) == bsp.o.MaxExportBatchSize {
203-
bsp.exportSpans()
198+
for {
199+
select {
200+
case sd := <-bsp.queue:
201+
if sd == nil {
202+
bsp.exportSpans()
203+
return
204+
}
205+
206+
bsp.batch = append(bsp.batch, sd)
207+
if len(bsp.batch) == bsp.o.MaxExportBatchSize {
208+
bsp.exportSpans()
209+
}
210+
default:
211+
close(bsp.queue)
204212
}
205213
}
206-
bsp.exportSpans()
207214
}
208215

209216
func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
@@ -226,17 +233,19 @@ func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
226233
panic(x)
227234
}()
228235

236+
select {
237+
case <-bsp.stopCh:
238+
return
239+
default:
240+
}
241+
229242
if bsp.o.BlockOnQueueFull {
230-
select {
231-
case bsp.queue <- sd:
232-
case <-bsp.stopCh:
233-
}
243+
bsp.queue <- sd
234244
return
235245
}
236246

237247
select {
238248
case bsp.queue <- sd:
239-
case <-bsp.stopCh:
240249
default:
241250
atomic.AddUint32(&bsp.dropped, 1)
242251
}

sdk/trace/batch_span_processor_test.go

+15-9
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
117117
sdktrace.WithBatchTimeout(schDelay),
118118
sdktrace.WithMaxQueueSize(200),
119119
sdktrace.WithMaxExportBatchSize(20),
120-
sdktrace.WithBlocking(),
121120
},
122121
wantNumSpans: 205,
123122
wantBatchCount: 11,
@@ -139,7 +138,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
139138
o: []sdktrace.BatchSpanProcessorOption{
140139
sdktrace.WithBatchTimeout(schDelay),
141140
sdktrace.WithMaxExportBatchSize(200),
142-
sdktrace.WithBlocking(),
143141
},
144142
wantNumSpans: 2000,
145143
wantBatchCount: 10,
@@ -162,18 +160,26 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
162160

163161
tp.UnregisterSpanProcessor(ssp)
164162

165-
// TODO(https://github.com/open-telemetry/opentelemetry-go/issues/741)
166-
// Restore some sort of test here.
167-
_ = option.wantNumSpans
168-
_ = option.wantBatchCount
169-
_ = te.len() // gotNumOfSpans
170-
_ = te.getBatchCount() // gotBatchCount
163+
gotNumOfSpans := te.len()
164+
if option.wantNumSpans != gotNumOfSpans {
165+
t.Errorf("number of exported span: got %+v, want %+v\n",
166+
gotNumOfSpans, option.wantNumSpans)
167+
}
168+
169+
gotBatchCount := te.getBatchCount()
170+
if gotBatchCount < option.wantBatchCount {
171+
t.Errorf("number batches: got %+v, want >= %+v\n",
172+
gotBatchCount, option.wantBatchCount)
173+
t.Errorf("Batches %v\n", te.sizes)
174+
}
171175
})
172176
}
173177
}
174178

175179
func createAndRegisterBatchSP(t *testing.T, option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor {
176-
ssp, err := sdktrace.NewBatchSpanProcessor(te, option.o...)
180+
// Always use blocking queue to avoid flaky tests.
181+
options := append(option.o, sdktrace.WithBlocking())
182+
ssp, err := sdktrace.NewBatchSpanProcessor(te, options...)
177183
if ssp == nil {
178184
t.Errorf("%s: Error creating new instance of BatchSpanProcessor, error: %v\n", option.name, err)
179185
}

0 commit comments

Comments
 (0)