Skip to content

Commit 8106ad8

Browse files
committed
[exporterhelper] Introduce batching functionality
This change introduces new experimental batching functionality to the exporter helper
1 parent cc88aee commit 8106ad8

File tree

6 files changed

+922
-11
lines changed

6 files changed

+922
-11
lines changed

.chloggen/batch-exporter-helper.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add experimental batching capabilities to the exporter helper
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [8122]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]
Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"sync"
10+
"sync/atomic"
11+
"time"
12+
13+
"go.uber.org/zap"
14+
15+
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/exporter"
17+
)
18+
19+
// MergeBatcherConfig defines a basic configuration for batching requests based on a timeout and a minimum number of
20+
// items.
21+
// This API is at the early stage of development and may change without backward compatibility
22+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
23+
type MergeBatcherConfig struct {
24+
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
25+
Enabled bool `mapstructure:"enabled"`
26+
27+
// Timeout sets the time after which a batch will be sent regardless of its size.
28+
// When this is set to zero, data will be sent immediately.
29+
// This is a recommended option, as it will ensure that the data is sent in a timely manner.
30+
Timeout time.Duration `mapstructure:"timeout"` // Is there a better name to avoid confusion with the consumerSender timeout?
31+
32+
// MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be
33+
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
34+
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
35+
MinSizeItems int `mapstructure:"min_size_items"`
36+
}
37+
38+
func (c MergeBatcherConfig) Validate() error {
39+
if c.MinSizeItems < 0 {
40+
return errors.New("min_size_items must be greater than or equal to zero")
41+
}
42+
if c.Timeout <= 0 {
43+
return errors.New("timeout must be greater than zero")
44+
}
45+
return nil
46+
}
47+
48+
func NewDefaultMergeBatcherConfig() MergeBatcherConfig {
49+
return MergeBatcherConfig{
50+
Enabled: true,
51+
Timeout: 200 * time.Millisecond,
52+
MinSizeItems: 8192,
53+
}
54+
}
55+
56+
// SplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and
57+
// minimum and maximum number of items.
58+
// This API is at the early stage of development and may change without backward compatibility
59+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
60+
type SplitBatcherConfig struct {
61+
// MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP,
62+
// but can be anything else for other formats. If the batch size exceeds this value,
63+
// it will be broken up into smaller batches if possible.
64+
// Setting this value to zero disables the maximum size limit.
65+
MaxSizeItems int `mapstructure:"max_size_items"`
66+
}
67+
68+
func (c SplitBatcherConfig) Validate() error {
69+
if c.MaxSizeItems < 0 {
70+
return errors.New("max_size_items must be greater than or equal to zero")
71+
}
72+
return nil
73+
}
74+
75+
// MergeSplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and
76+
// minimum and maximum number of items.
77+
// This API is at the early stage of development and may change without backward compatibility
78+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
79+
type MergeSplitBatcherConfig struct {
80+
MergeBatcherConfig `mapstructure:",squash"`
81+
SplitBatcherConfig `mapstructure:",squash"`
82+
}
83+
84+
func (c MergeSplitBatcherConfig) Validate() error {
85+
if c.MaxSizeItems < c.MinSizeItems {
86+
return errors.New("max_size_items must be greater than or equal to min_size_items")
87+
}
88+
return nil
89+
}
90+
91+
// BatchConfigBatchersLimit defines batching configuration part for setting a maximum number of batchers.
92+
// This API is at the early stage of development and may change without backward compatibility
93+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
94+
type BatchConfigBatchersLimit struct {
95+
// BatchersLimit is the maximum number of batchers that can be used for batching.
96+
// Requests producing batch identifiers that exceed this limit will be dropped.
97+
// If this value is zero, then there is no limit on the number of batchers.
98+
BatchersLimit int `mapstructure:"batchers_limit"`
99+
}
100+
101+
// BatchMergeFunc is a function that merges two requests into a single request.
102+
// Context will be propagated from the first request.
103+
// Do not mutate the requests passed to the function if error can be returned after mutation.
104+
// This API is at the early stage of development and may change without backward compatibility
105+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
106+
type BatchMergeFunc func(context.Context, Request, Request) (Request, error)
107+
108+
// BatchMergeSplitFunc is a function that merge and/or splits a request into multiple requests based on the provided
109+
// limit of maximum number of items. All the returned requests MUST have a number of items that does not exceed the
110+
// maximum number of items. Size of the last returned request MUST be less or equal than the size of any other returned
111+
// request. The original request MUST not be mutated if error is returned. The length of the returned slice MUST not
112+
// be 0. The optionalReq argument can be nil, make sure to check it before using. maxItems argument is guaranteed to be
113+
// greater than 0. Context will be propagated from the original request.
114+
// This API is at the early stage of development and may change without backward compatibility
115+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
116+
type BatchMergeSplitFunc func(ctx context.Context, optionalReq Request, req Request, maxItems int) ([]Request, error)
117+
118+
type BatcherOption func(*batchSender)
119+
120+
func WithSplitBatcher(cfg SplitBatcherConfig, msf BatchMergeSplitFunc) BatcherOption {
121+
return func(b *batchSender) {
122+
if cfg.MaxSizeItems != 0 {
123+
b.splitCfg = cfg
124+
b.mergeSplitFunc = msf
125+
}
126+
}
127+
}
128+
129+
// batchSender is a component that accepts places requests into batches before passing them to the downstream senders.
130+
//
131+
// batch_processor implements consumer.Traces and consumer.Metrics
132+
//
133+
// Batches are sent out with any of the following conditions:
134+
// - batch size reaches cfg.SendBatchSize
135+
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
136+
type batchSender struct {
137+
baseRequestSender
138+
mergeCfg MergeBatcherConfig
139+
splitCfg SplitBatcherConfig
140+
mergeFunc BatchMergeFunc
141+
mergeSplitFunc BatchMergeSplitFunc
142+
143+
// concurrencyLimit is the maximum number of goroutines that can be created by the batcher.
144+
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
145+
// Populated from the number of queue consumers if queue is enabled.
146+
concurrencyLimit uint64
147+
activeRequests atomic.Uint64
148+
149+
resetTimerCh chan struct{}
150+
151+
mu sync.Mutex
152+
activeBatch *batch
153+
154+
logger *zap.Logger
155+
156+
shutdownCh chan struct{}
157+
stopped *atomic.Bool
158+
}
159+
160+
// newBatchSender returns a new batch consumer component.
161+
func newBatchSender(cfg MergeBatcherConfig, set exporter.CreateSettings, mf BatchMergeFunc, opts ...BatcherOption) requestSender {
162+
bs := &batchSender{
163+
activeBatch: newEmptyBatch(),
164+
mergeCfg: cfg,
165+
mergeFunc: mf,
166+
logger: set.Logger,
167+
shutdownCh: make(chan struct{}),
168+
stopped: &atomic.Bool{},
169+
resetTimerCh: make(chan struct{}),
170+
}
171+
172+
for _, op := range opts {
173+
op(bs)
174+
}
175+
return bs
176+
}
177+
178+
func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
179+
timer := time.NewTimer(bs.mergeCfg.Timeout)
180+
go func() {
181+
for {
182+
select {
183+
case <-bs.shutdownCh:
184+
bs.mu.Lock()
185+
if bs.activeBatch.request != nil {
186+
bs.exportActiveBatch()
187+
}
188+
bs.mu.Unlock()
189+
if !timer.Stop() {
190+
<-timer.C
191+
}
192+
return
193+
case <-timer.C:
194+
bs.mu.Lock()
195+
if bs.activeBatch.request != nil {
196+
bs.exportActiveBatch()
197+
}
198+
bs.mu.Unlock()
199+
timer.Reset(bs.mergeCfg.Timeout)
200+
case <-bs.resetTimerCh:
201+
if !timer.Stop() {
202+
<-timer.C
203+
}
204+
timer.Reset(bs.mergeCfg.Timeout)
205+
}
206+
}
207+
}()
208+
209+
return nil
210+
}
211+
212+
type batch struct {
213+
ctx context.Context
214+
request Request
215+
done chan struct{}
216+
err error
217+
}
218+
219+
func newEmptyBatch() *batch {
220+
return &batch{
221+
ctx: context.Background(),
222+
done: make(chan struct{}),
223+
}
224+
}
225+
226+
// exportActiveBatch exports the active batch asynchronously and replaces it with a new one.
227+
// Caller must hold the lock.
228+
func (bs *batchSender) exportActiveBatch() {
229+
go func(b *batch) {
230+
b.err = b.request.Export(b.ctx)
231+
close(b.done)
232+
}(bs.activeBatch)
233+
bs.activeBatch = newEmptyBatch()
234+
}
235+
236+
// isActiveBatchReady returns true if the active batch is ready to be exported.
237+
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
238+
// Caller must hold the lock.
239+
func (bs *batchSender) isActiveBatchReady() bool {
240+
return bs.activeBatch.request.ItemsCount() >= bs.mergeCfg.MinSizeItems ||
241+
(bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit)
242+
}
243+
244+
func (bs *batchSender) send(ctx context.Context, req Request) error {
245+
// Stopped batch sender should act as pass-through to allow the queue to be drained.
246+
if bs.stopped.Load() {
247+
return bs.nextSender.send(ctx, req)
248+
}
249+
250+
bs.activeRequests.Add(1)
251+
defer bs.activeRequests.Add(^uint64(0))
252+
253+
if bs.mergeSplitFunc != nil {
254+
return bs.sendMergeSplitBatch(ctx, req)
255+
}
256+
return bs.sendMergeBatch(ctx, req)
257+
}
258+
259+
// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests.
260+
func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error {
261+
bs.mu.Lock()
262+
263+
reqs, err := bs.mergeSplitFunc(ctx, bs.activeBatch.request, req, bs.splitCfg.MaxSizeItems)
264+
if err != nil || len(reqs) == 0 {
265+
bs.mu.Unlock()
266+
return err
267+
}
268+
if len(reqs) == 1 || bs.activeBatch.request != nil {
269+
bs.updateActiveBatch(ctx, reqs[0])
270+
batch := bs.activeBatch
271+
if bs.isActiveBatchReady() || len(reqs) > 1 {
272+
bs.exportActiveBatch()
273+
bs.resetTimerCh <- struct{}{}
274+
}
275+
bs.mu.Unlock()
276+
<-batch.done
277+
if batch.err != nil {
278+
return batch.err
279+
}
280+
reqs = reqs[1:]
281+
} else {
282+
bs.mu.Unlock()
283+
}
284+
285+
// Intentionally do not put the last request in the active batch to not block it.
286+
// TODO: Consider including the partial request in the error to avoid double publishing.
287+
for _, r := range reqs {
288+
if err := r.Export(ctx); err != nil {
289+
return err
290+
}
291+
}
292+
return nil
293+
}
294+
295+
// sendMergeBatch sends the request to the batch and waits for the batch to be exported.
296+
func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
297+
bs.mu.Lock()
298+
if bs.activeBatch.request != nil {
299+
var err error
300+
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
301+
if err != nil {
302+
bs.mu.Unlock()
303+
return err
304+
}
305+
}
306+
bs.updateActiveBatch(ctx, req)
307+
batch := bs.activeBatch
308+
if bs.isActiveBatchReady() {
309+
bs.exportActiveBatch()
310+
bs.resetTimerCh <- struct{}{}
311+
}
312+
bs.mu.Unlock()
313+
<-batch.done
314+
return batch.err
315+
}
316+
317+
// updateActiveBatch update the active batch to the new merged request and context.
318+
// The context is only set once and is not updated after the first call.
319+
// Merging the context would be complex and require an additional goroutine to handle the context cancellation.
320+
// We take the approach of using the context from the first request since it's likely to have the shortest timeout.
321+
func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) {
322+
if bs.activeBatch.request == nil {
323+
bs.activeBatch.ctx = ctx
324+
}
325+
bs.activeBatch.request = req
326+
}
327+
328+
func (bs *batchSender) Shutdown(context.Context) error {
329+
bs.stopped.Store(true)
330+
close(bs.shutdownCh)
331+
// Wait for the active requests to finish.
332+
for bs.activeRequests.Load() > 0 {
333+
time.Sleep(10 * time.Millisecond)
334+
}
335+
return nil
336+
}

0 commit comments

Comments
 (0)