Skip to content

Commit e367eb1

Browse files
[exporterhelper] Add span links across batcher when merging multiple requests (#12768)
#### Description Continuation of #12318: - Small change to avoid adding both a parent span relationship *and* a span link in cases where no merge is made - Fix failing tests by removing them: Those tests relate to cancelling a batch of export requests if the context for one of them is cancelled. I'm not sure how useful this logic is, or if it makes sense to cancel unrelated requests that happened to be batched with the one that was cancelled. If there are objections to this, I can try to reimplement this logic. #### Link to tracking issue Updates #12212 (remaining: persist parent span across persistent queue) (edit by @mx-psi): - Fixes #11140 - Fixes #11141 --------- Co-authored-by: Sindy Li <[email protected]>
1 parent 568c3ce commit e367eb1

File tree

7 files changed

+120
-77
lines changed

7 files changed

+120
-77
lines changed

.chloggen/merged_context.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Link batcher context to all batched request's span contexts.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12212, 8122]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/exporterhelper/internal/obs_report_sender.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/component"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1920
"go.opentelemetry.io/collector/pipeline"
@@ -96,7 +97,10 @@ func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
9697
// StartOp creates the span used to trace the operation. Returning
9798
// the updated context and the created span.
9899
func (ors *obsReportSender[K]) startOp(ctx context.Context) context.Context {
99-
ctx, _ = ors.tracer.Start(ctx, ors.spanName, ors.spanAttrs)
100+
ctx, _ = ors.tracer.Start(ctx,
101+
ors.spanName,
102+
ors.spanAttrs,
103+
trace.WithLinks(queuebatch.LinksFromContext(ctx)...))
100104
return ctx
101105
}
102106

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/otel/trace"
10+
)
11+
12+
type traceContextKeyType int
13+
14+
const batchSpanLinksKey traceContextKeyType = iota
15+
16+
// LinksFromContext returns a list of trace links registered in the context.
17+
func LinksFromContext(ctx context.Context) []trace.Link {
18+
if ctx == nil {
19+
return []trace.Link{}
20+
}
21+
if links, ok := ctx.Value(batchSpanLinksKey).([]trace.Link); ok {
22+
return links
23+
}
24+
return []trace.Link{}
25+
}
26+
27+
func parentsFromContext(ctx context.Context) []trace.Link {
28+
if spanCtx := trace.SpanContextFromContext(ctx); spanCtx.IsValid() {
29+
return []trace.Link{{SpanContext: spanCtx}}
30+
}
31+
return LinksFromContext(ctx)
32+
}
33+
34+
func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context {
35+
return context.WithValue(
36+
context.Background(),
37+
batchSpanLinksKey,
38+
append(parentsFromContext(ctx1), parentsFromContext(ctx2)...),
39+
)
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch
5+
6+
import (
7+
"context"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
"go.opentelemetry.io/otel/trace"
12+
13+
"go.opentelemetry.io/collector/component/componenttest"
14+
)
15+
16+
func TestBatchContextLink(t *testing.T) {
17+
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
18+
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")
19+
20+
ctx1 := context.Background()
21+
22+
ctx2, span2 := tracer.Start(ctx1, "span2")
23+
defer span2.End()
24+
25+
ctx3, span3 := tracer.Start(ctx1, "span3")
26+
defer span3.End()
27+
28+
ctx4, span4 := tracer.Start(ctx1, "span4")
29+
defer span4.End()
30+
31+
batchContext := contextWithMergedLinks(ctx2, ctx3)
32+
batchContext = contextWithMergedLinks(batchContext, ctx4)
33+
34+
actualLinks := LinksFromContext(batchContext)
35+
require.Len(t, actualLinks, 3)
36+
require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext)
37+
require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext)
38+
require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext)
39+
}

exporter/exporterhelper/internal/queuebatch/default_batcher.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,10 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
125125
// - Last result may not have enough data to be flushed.
126126

127127
// Logic on how to deal with the current batch:
128-
// TODO: Deal with merging Context.
129128
qb.currentBatch.req = reqList[0]
130129
qb.currentBatch.done = append(qb.currentBatch.done, done)
130+
qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx)
131+
131132
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
132133
// cannot unlock and re-lock because we are not done processing all the responses.
133134
var firstBatch *batch

exporter/exporterhelper/internal/queuebatch/obs_queue.go

+9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"go.opentelemetry.io/otel/attribute"
1010
"go.opentelemetry.io/otel/metric"
11+
"go.opentelemetry.io/otel/trace"
1112

1213
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
1314
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -28,6 +29,7 @@ type obsQueue[T request.Request] struct {
2829
tb *metadata.TelemetryBuilder
2930
metricAttr metric.MeasurementOption
3031
enqueueFailedInst metric.Int64Counter
32+
tracer trace.Tracer
3133
}
3234

3335
func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T], error) {
@@ -54,10 +56,13 @@ func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T
5456
return nil, err
5557
}
5658

59+
tracer := metadata.Tracer(set.Telemetry)
60+
5761
or := &obsQueue[T]{
5862
Queue: delegate,
5963
tb: tb,
6064
metricAttr: metric.WithAttributeSet(attribute.NewSet(exporterAttr)),
65+
tracer: tracer,
6166
}
6267

6368
switch set.Signal {
@@ -81,7 +86,11 @@ func (or *obsQueue[T]) Offer(ctx context.Context, req T) error {
8186
// Have to read the number of items before sending the request since the request can
8287
// be modified by the downstream components like the batcher.
8388
numItems := req.ItemsCount()
89+
90+
ctx, span := or.tracer.Start(ctx, "exporter/enqueue")
8491
err := or.Queue.Offer(ctx, req)
92+
span.End()
93+
8594
// No metrics recorded for profiles, remove enqueueFailedInst check with nil when profiles metrics available.
8695
if err != nil && or.enqueueFailedInst != nil {
8796
or.enqueueFailedInst.Add(ctx, int64(numItems), or.metricAttr)

exporter/exporterhelper/internal/queuebatch/queue_batch_test.go

-75
Original file line numberDiff line numberDiff line change
@@ -449,40 +449,6 @@ func TestQueueBatch_BatchBlocking(t *testing.T) {
449449
require.NoError(t, qb.Shutdown(context.Background()))
450450
}
451451

452-
// Validate that the batch is cancelled once the first request in the request is cancelled
453-
func TestQueueBatch_BatchCancelled(t *testing.T) {
454-
sink := requesttest.NewSink()
455-
cfg := newTestConfig()
456-
cfg.WaitForResult = true
457-
cfg.Batch.MinSize = 2
458-
qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sink.Export)
459-
require.NoError(t, err)
460-
require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost()))
461-
462-
// send 2 blockOnOverflow requests
463-
wg := sync.WaitGroup{}
464-
ctx, cancel := context.WithCancel(context.Background())
465-
wg.Add(1)
466-
go func() {
467-
defer wg.Done()
468-
assert.ErrorIs(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 1, Delay: 100 * time.Millisecond}), context.Canceled)
469-
}()
470-
wg.Add(1)
471-
go func() {
472-
defer wg.Done()
473-
time.Sleep(100 * time.Millisecond) // ensure this call is the second
474-
assert.ErrorIs(t, qb.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Delay: 100 * time.Millisecond}), context.Canceled)
475-
}()
476-
cancel() // canceling the first request should cancel the whole batch
477-
wg.Wait()
478-
479-
// nothing should be delivered
480-
assert.Equal(t, 0, sink.RequestsCount())
481-
assert.Equal(t, 0, sink.ItemsCount())
482-
483-
require.NoError(t, qb.Shutdown(context.Background()))
484-
}
485-
486452
func TestQueueBatch_DrainActiveRequests(t *testing.T) {
487453
sink := requesttest.NewSink()
488454
cfg := newTestConfig()
@@ -514,47 +480,6 @@ func TestQueueBatch_DrainActiveRequests(t *testing.T) {
514480
assert.Equal(t, 3, sink.ItemsCount())
515481
}
516482

517-
func TestQueueBatchWithTimeout(t *testing.T) {
518-
sink := requesttest.NewSink()
519-
cfg := newTestConfig()
520-
cfg.WaitForResult = true
521-
cfg.Batch.MinSize = 10
522-
qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sink.Export)
523-
require.NoError(t, err)
524-
require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost()))
525-
526-
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
527-
defer cancel()
528-
// Send 3 concurrent requests that should be merged in one batch
529-
wg := sync.WaitGroup{}
530-
for i := 0; i < 3; i++ {
531-
wg.Add(1)
532-
go func() {
533-
assert.NoError(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 4}))
534-
wg.Done()
535-
}()
536-
}
537-
wg.Wait()
538-
assert.Equal(t, 1, sink.RequestsCount())
539-
assert.Equal(t, 12, sink.ItemsCount())
540-
541-
// 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender
542-
for i := 0; i < 3; i++ {
543-
wg.Add(1)
544-
go func() {
545-
assert.Error(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 4, Delay: 30 * time.Millisecond}))
546-
wg.Done()
547-
}()
548-
}
549-
wg.Wait()
550-
551-
require.NoError(t, qb.Shutdown(context.Background()))
552-
553-
// The sink should not change
554-
assert.Equal(t, 1, sink.RequestsCount())
555-
assert.Equal(t, 12, sink.ItemsCount())
556-
}
557-
558483
func TestQueueBatchTimerResetNoConflict(t *testing.T) {
559484
sink := requesttest.NewSink()
560485
cfg := newTestConfig()

0 commit comments

Comments
 (0)