Skip to content

Commit 1861c97

Browse files
committed
[exporterhelper] Introduce batching functionality
This change introduces new experimental batching functionality to the exporter helper
1 parent 4ae8d2b commit 1861c97

File tree

9 files changed

+933
-12
lines changed

9 files changed

+933
-12
lines changed

.chloggen/batch-exporter-helper.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add experimental batching capabilities to the exporter helper
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [8122]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher"
5+
6+
import "context"
7+
8+
// BatchMergeFunc is a function that merges two requests into a single request.
9+
// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is
10+
// marked as not mutable.
11+
// This API is at the early stage of development and may change without backward compatibility
12+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
13+
type BatchMergeFunc[T any] func(context.Context, T, T) (T, error)
14+
15+
// BatchMergeSplitFunc is a function that merge and/or splits one or two requests into multiple requests based on the
16+
// configured limit provided in MaxSizeConfig.
17+
// All the returned requests MUST have a number of items that does not exceed the maximum number of items.
18+
// Size of the last returned request MUST be less or equal than the size of any other returned request.
19+
// The original request MUST not be mutated if error is returned after mutation or if the exporter is
20+
// marked as not mutable. The length of the returned slice MUST not be 0. The optionalReq argument can be nil,
21+
// make sure to check it before using.
22+
// This API is at the early stage of development and may change without backward compatibility
23+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
24+
type BatchMergeSplitFunc[T any] func(ctx context.Context, cfg MaxSizeConfig, optionalReq T, req T) ([]T, error)

exporter/exporterbatcher/config.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher"
5+
6+
import (
7+
"errors"
8+
"time"
9+
)
10+
11+
// Config defines a configuration for batching requests based on a timeout and a minimum number of items.
12+
// MaxSizeItems defines batch splitting functionality if it's more than zero.
13+
// This API is at the early stage of development and may change without backward compatibility
14+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
15+
type Config struct {
16+
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
17+
Enabled bool `mapstructure:"enabled"`
18+
19+
// FlushTimeout sets the time after which a batch will be sent regardless of its size.
20+
FlushTimeout time.Duration `mapstructure:"flush_timeout"`
21+
22+
MinSizeConfig `mapstructure:",squash"`
23+
MaxSizeConfig `mapstructure:",squash"`
24+
}
25+
26+
// MinSizeConfig defines the configuration for the minimum number of items in a batch.
27+
// This API is at the early stage of development and may change without backward compatibility
28+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
29+
type MinSizeConfig struct {
30+
// MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be
31+
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
32+
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
33+
MinSizeItems int `mapstructure:"min_size_items"`
34+
}
35+
36+
// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
37+
// This API is at the early stage of development and may change without backward compatibility
38+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
39+
type MaxSizeConfig struct {
40+
// MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP.
41+
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
42+
// Setting this value to zero disables the maximum size limit.
43+
MaxSizeItems int `mapstructure:"max_size_items"`
44+
}
45+
46+
func (c Config) Validate() error {
47+
if c.MinSizeItems < 0 {
48+
return errors.New("min_size_items must be greater than or equal to zero")
49+
}
50+
if c.MaxSizeItems < 0 {
51+
return errors.New("max_size_items must be greater than or equal to zero")
52+
}
53+
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
54+
return errors.New("max_size_items must be greater than or equal to min_size_items")
55+
}
56+
if c.FlushTimeout <= 0 {
57+
return errors.New("timeout must be greater than zero")
58+
}
59+
return nil
60+
}
61+
62+
func NewDefaultConfig() Config {
63+
return Config{
64+
Enabled: true,
65+
FlushTimeout: 200 * time.Millisecond,
66+
MinSizeConfig: MinSizeConfig{
67+
MinSizeItems: 8192,
68+
},
69+
}
70+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterbatcher
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestConfig_Validate(t *testing.T) {
13+
cfg := NewDefaultConfig()
14+
assert.NoError(t, cfg.Validate())
15+
16+
cfg.MinSizeItems = -1
17+
assert.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero")
18+
19+
cfg = NewDefaultConfig()
20+
cfg.FlushTimeout = 0
21+
assert.EqualError(t, cfg.Validate(), "timeout must be greater than zero")
22+
23+
cfg.MaxSizeItems = -1
24+
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero")
25+
26+
cfg = NewDefaultConfig()
27+
cfg.MaxSizeItems = 20000
28+
cfg.MinSizeItems = 20001
29+
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items")
30+
}
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"
5+
6+
import (
7+
"context"
8+
"sync"
9+
"sync/atomic"
10+
"time"
11+
12+
"go.uber.org/zap"
13+
14+
"go.opentelemetry.io/collector/component"
15+
"go.opentelemetry.io/collector/exporter"
16+
"go.opentelemetry.io/collector/exporter/exporterbatcher"
17+
)
18+
19+
// batchSender is a component that accepts places requests into batches before passing them to the downstream senders.
20+
//
21+
// batch_processor implements consumer.Traces and consumer.Metrics
22+
//
23+
// Batches are sent out with any of the following conditions:
24+
// - batch size reaches cfg.SendBatchSize
25+
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
26+
type batchSender struct {
27+
baseRequestSender
28+
cfg exporterbatcher.Config
29+
mergeFunc exporterbatcher.BatchMergeFunc[Request]
30+
mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[Request]
31+
32+
// concurrencyLimit is the maximum number of goroutines that can be created by the batcher.
33+
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
34+
// Populated from the number of queue consumers if queue is enabled.
35+
concurrencyLimit uint64
36+
activeRequests atomic.Uint64
37+
38+
resetTimerCh chan struct{}
39+
40+
mu sync.Mutex
41+
activeBatch *batch
42+
43+
logger *zap.Logger
44+
45+
shutdownCh chan struct{}
46+
stopped *atomic.Bool
47+
}
48+
49+
// newBatchSender returns a new batch consumer component.
50+
func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings) *batchSender {
51+
bs := &batchSender{
52+
activeBatch: newEmptyBatch(),
53+
cfg: cfg,
54+
logger: set.Logger,
55+
shutdownCh: make(chan struct{}),
56+
stopped: &atomic.Bool{},
57+
resetTimerCh: make(chan struct{}),
58+
}
59+
return bs
60+
}
61+
62+
func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
63+
timer := time.NewTimer(bs.cfg.FlushTimeout)
64+
go func() {
65+
for {
66+
select {
67+
case <-bs.shutdownCh:
68+
bs.mu.Lock()
69+
if bs.activeBatch.request != nil {
70+
bs.exportActiveBatch()
71+
}
72+
bs.mu.Unlock()
73+
if !timer.Stop() {
74+
<-timer.C
75+
}
76+
return
77+
case <-timer.C:
78+
bs.mu.Lock()
79+
if bs.activeBatch.request != nil {
80+
bs.exportActiveBatch()
81+
}
82+
bs.mu.Unlock()
83+
timer.Reset(bs.cfg.FlushTimeout)
84+
case <-bs.resetTimerCh:
85+
if !timer.Stop() {
86+
<-timer.C
87+
}
88+
timer.Reset(bs.cfg.FlushTimeout)
89+
}
90+
}
91+
}()
92+
93+
return nil
94+
}
95+
96+
type batch struct {
97+
ctx context.Context
98+
request Request
99+
done chan struct{}
100+
err error
101+
}
102+
103+
func newEmptyBatch() *batch {
104+
return &batch{
105+
ctx: context.Background(),
106+
done: make(chan struct{}),
107+
}
108+
}
109+
110+
// exportActiveBatch exports the active batch asynchronously and replaces it with a new one.
111+
// Caller must hold the lock.
112+
func (bs *batchSender) exportActiveBatch() {
113+
go func(b *batch) {
114+
b.err = b.request.Export(b.ctx)
115+
close(b.done)
116+
}(bs.activeBatch)
117+
bs.activeBatch = newEmptyBatch()
118+
}
119+
120+
// isActiveBatchReady returns true if the active batch is ready to be exported.
121+
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
122+
// Caller must hold the lock.
123+
func (bs *batchSender) isActiveBatchReady() bool {
124+
return bs.activeBatch.request.ItemsCount() >= bs.cfg.MinSizeItems ||
125+
(bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit)
126+
}
127+
128+
func (bs *batchSender) send(ctx context.Context, req Request) error {
129+
// Stopped batch sender should act as pass-through to allow the queue to be drained.
130+
if bs.stopped.Load() {
131+
return bs.nextSender.send(ctx, req)
132+
}
133+
134+
bs.activeRequests.Add(1)
135+
defer bs.activeRequests.Add(^uint64(0))
136+
137+
if bs.cfg.MaxSizeItems > 0 {
138+
return bs.sendMergeSplitBatch(ctx, req)
139+
}
140+
return bs.sendMergeBatch(ctx, req)
141+
}
142+
143+
// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests.
144+
func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error {
145+
bs.mu.Lock()
146+
147+
reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req)
148+
if err != nil || len(reqs) == 0 {
149+
bs.mu.Unlock()
150+
return err
151+
}
152+
if len(reqs) == 1 || bs.activeBatch.request != nil {
153+
bs.updateActiveBatch(ctx, reqs[0])
154+
batch := bs.activeBatch
155+
if bs.isActiveBatchReady() || len(reqs) > 1 {
156+
bs.exportActiveBatch()
157+
bs.resetTimerCh <- struct{}{}
158+
}
159+
bs.mu.Unlock()
160+
<-batch.done
161+
if batch.err != nil {
162+
return batch.err
163+
}
164+
reqs = reqs[1:]
165+
} else {
166+
bs.mu.Unlock()
167+
}
168+
169+
// Intentionally do not put the last request in the active batch to not block it.
170+
// TODO: Consider including the partial request in the error to avoid double publishing.
171+
for _, r := range reqs {
172+
if err := r.Export(ctx); err != nil {
173+
return err
174+
}
175+
}
176+
return nil
177+
}
178+
179+
// sendMergeBatch sends the request to the batch and waits for the batch to be exported.
180+
func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
181+
bs.mu.Lock()
182+
if bs.activeBatch.request != nil {
183+
var err error
184+
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
185+
if err != nil {
186+
bs.mu.Unlock()
187+
return err
188+
}
189+
}
190+
bs.updateActiveBatch(ctx, req)
191+
batch := bs.activeBatch
192+
if bs.isActiveBatchReady() {
193+
bs.exportActiveBatch()
194+
bs.resetTimerCh <- struct{}{}
195+
}
196+
bs.mu.Unlock()
197+
<-batch.done
198+
return batch.err
199+
}
200+
201+
// updateActiveBatch update the active batch to the new merged request and context.
202+
// The context is only set once and is not updated after the first call.
203+
// Merging the context would be complex and require an additional goroutine to handle the context cancellation.
204+
// We take the approach of using the context from the first request since it's likely to have the shortest timeout.
205+
func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) {
206+
if bs.activeBatch.request == nil {
207+
bs.activeBatch.ctx = ctx
208+
}
209+
bs.activeBatch.request = req
210+
}
211+
212+
func (bs *batchSender) Shutdown(context.Context) error {
213+
bs.stopped.Store(true)
214+
close(bs.shutdownCh)
215+
// Wait for the active requests to finish.
216+
for bs.activeRequests.Load() > 0 {
217+
time.Sleep(10 * time.Millisecond)
218+
}
219+
return nil
220+
}

0 commit comments

Comments
 (0)