Skip to content

Commit 785c752

Browse files
committed
[exporterhelper] Introduce batching functionality
This change introduces new experimental batching functionality to the exporter helper
1 parent cefe1b7 commit 785c752

File tree

7 files changed

+684
-25
lines changed

7 files changed

+684
-25
lines changed

.chloggen/batch-exporter-helper.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add experimental batching capabilities to the exporter helper
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [8122]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"sync"
10+
"sync/atomic"
11+
"time"
12+
13+
"go.uber.org/zap"
14+
15+
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/exporter"
17+
)
18+
19+
// MergeBatcherConfig defines a basic configuration for batching requests based on a timeout and a minimum number of
20+
// items.
21+
// This API is at the early stage of development and may change without backward compatibility
22+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
23+
type MergeBatcherConfig struct {
24+
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
25+
Enabled bool `mapstructure:"enabled"`
26+
27+
// Timeout sets the time after which a batch will be sent regardless of its size.
28+
// When this is set to zero, data will be sent immediately.
29+
// This is a recommended option, as it will ensure that the data is sent in a timely manner.
30+
Timeout time.Duration `mapstructure:"timeout"` // Is there a better name to avoid confusion with the consumerSender timeout?
31+
32+
// MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be
33+
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
34+
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
35+
MinSizeItems int `mapstructure:"min_size_items"`
36+
}
37+
38+
func (c MergeBatcherConfig) Validate() error {
39+
if c.MinSizeItems < 0 {
40+
return errors.New("min_size_items must be greater than or equal to zero")
41+
}
42+
if c.Timeout <= 0 {
43+
return errors.New("timeout must be greater than zero")
44+
}
45+
return nil
46+
}
47+
48+
func NewDefaultMergeBatcherConfig() MergeBatcherConfig {
49+
return MergeBatcherConfig{
50+
Enabled: true,
51+
Timeout: 200 * time.Millisecond,
52+
MinSizeItems: 8192,
53+
}
54+
}
55+
56+
// SplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and
57+
// minimum and maximum number of items.
58+
// This API is at the early stage of development and may change without backward compatibility
59+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
60+
type SplitBatcherConfig struct {
61+
// MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP,
62+
// but can be anything else for other formats. If the batch size exceeds this value,
63+
// it will be broken up into smaller batches if possible.
64+
// Setting this value to zero disables the maximum size limit.
65+
MaxSizeItems int `mapstructure:"max_size_items"`
66+
}
67+
68+
func (c SplitBatcherConfig) Validate() error {
69+
if c.MaxSizeItems < 0 {
70+
return errors.New("max_size_items must be greater than or equal to zero")
71+
}
72+
return nil
73+
}
74+
75+
// MergeSplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and
76+
// minimum and maximum number of items.
77+
// This API is at the early stage of development and may change without backward compatibility
78+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
79+
type MergeSplitBatcherConfig struct {
80+
MergeBatcherConfig `mapstructure:",squash"`
81+
SplitBatcherConfig `mapstructure:",squash"`
82+
}
83+
84+
func (c MergeSplitBatcherConfig) Validate() error {
85+
if c.MaxSizeItems < c.MinSizeItems {
86+
return errors.New("max_size_items must be greater than or equal to min_size_items")
87+
}
88+
return nil
89+
}
90+
91+
// BatchConfigBatchersLimit defines batching configuration part for setting a maximum number of batchers.
92+
// This API is at the early stage of development and may change without backward compatibility
93+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
94+
type BatchConfigBatchersLimit struct {
95+
// BatchersLimit is the maximum number of batchers that can be used for batching.
96+
// Requests producing batch identifiers that exceed this limit will be dropped.
97+
// If this value is zero, then there is no limit on the number of batchers.
98+
BatchersLimit int `mapstructure:"batchers_limit"`
99+
}
100+
101+
// BatchMergeFunc is a function that merges two requests into a single request.
102+
// Context will be propagated from the first request. If you want to separate batches based on the context,
103+
// use WithRequestBatchIdentifier option. Context will be propagated from the first request.
104+
// Do not mutate the requests passed to the function if error can be returned after mutation.
105+
// This API is at the early stage of development and may change without backward compatibility
106+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
107+
type BatchMergeFunc func(context.Context, Request, Request) (Request, error)
108+
109+
// BatchMergeSplitFunc is a function that merge and/or splits a request into multiple requests based on the provided
110+
// limit of maximum number of items. All the returned requests MUST have a number of items that does not exceed the
111+
// maximum number of items. Size of the last returned request MUST be less or equal than the size of any other returned
112+
// request. The original request MUST not be mutated if error is returned. The length of the returned slice MUST not
113+
// be 0. The optionalReq argument can be nil, make sure to check it before using. maxItems argument is guaranteed to be
114+
// greater than 0. Context will be propagated from the original request.
115+
// This API is at the early stage of development and may change without backward compatibility
116+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
117+
type BatchMergeSplitFunc func(ctx context.Context, optionalReq Request, req Request, maxItems int) ([]Request, error)
118+
119+
type BatcherOption func(*batchSender)
120+
121+
func WithSplitBatcher(cfg SplitBatcherConfig, msf BatchMergeSplitFunc) BatcherOption {
122+
return func(b *batchSender) {
123+
if cfg.MaxSizeItems != 0 {
124+
b.splitCfg = cfg
125+
b.mergeSplitFunc = msf
126+
}
127+
}
128+
}
129+
130+
// batchSender is a component that accepts places requests into batches before passing them to the downstream senders.
131+
//
132+
// batch_processor implements consumer.Traces and consumer.Metrics
133+
//
134+
// Batches are sent out with any of the following conditions:
135+
// - batch size reaches cfg.SendBatchSize
136+
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
137+
type batchSender struct {
138+
baseRequestSender
139+
mergeCfg MergeBatcherConfig
140+
splitCfg SplitBatcherConfig
141+
mergeFunc BatchMergeFunc
142+
mergeSplitFunc BatchMergeSplitFunc
143+
144+
// concurrencyLimit is the maximum number of goroutines that can be created by the batcher.
145+
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
146+
// Populated from the number of queue consumers if queue is enabled.
147+
concurrencyLimit int
148+
activeRequestsWG sync.WaitGroup
149+
150+
resetTimerCh chan struct{}
151+
152+
mu sync.Mutex
153+
activeBatch *batch
154+
155+
logger *zap.Logger
156+
157+
shutdownCh chan struct{}
158+
stopped *atomic.Bool
159+
}
160+
161+
// newBatchSender returns a new batch consumer component.
162+
func newBatchSender(cfg MergeBatcherConfig, set exporter.CreateSettings, mf BatchMergeFunc, opts ...BatcherOption) requestSender {
163+
bs := &batchSender{
164+
activeBatch: newEmptyBatch(),
165+
mergeCfg: cfg,
166+
mergeFunc: mf,
167+
logger: set.Logger,
168+
shutdownCh: make(chan struct{}),
169+
stopped: &atomic.Bool{},
170+
resetTimerCh: make(chan struct{}),
171+
}
172+
173+
for _, op := range opts {
174+
op(bs)
175+
}
176+
return bs
177+
}
178+
179+
func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
180+
timer := time.NewTimer(bs.mergeCfg.Timeout)
181+
go func() {
182+
for {
183+
select {
184+
case <-bs.shutdownCh:
185+
bs.mu.Lock()
186+
if bs.activeBatch.request != nil {
187+
bs.exportActiveBatch()
188+
}
189+
bs.mu.Unlock()
190+
return
191+
case <-timer.C:
192+
bs.mu.Lock()
193+
if bs.activeBatch.request != nil {
194+
bs.exportActiveBatch()
195+
}
196+
bs.mu.Unlock()
197+
timer.Reset(bs.mergeCfg.Timeout)
198+
case <-bs.resetTimerCh:
199+
if !timer.Stop() {
200+
<-timer.C
201+
}
202+
timer.Reset(bs.mergeCfg.Timeout)
203+
}
204+
}
205+
}()
206+
207+
return nil
208+
}
209+
210+
type batch struct {
211+
ctx context.Context
212+
request Request
213+
done chan struct{}
214+
err error
215+
}
216+
217+
func newEmptyBatch() *batch {
218+
return &batch{
219+
ctx: context.Background(),
220+
done: make(chan struct{}),
221+
}
222+
}
223+
224+
// Caller must hold the lock.
225+
func (bs *batchSender) exportActiveBatch() {
226+
go func(b *batch) {
227+
b.err = b.request.Export(b.ctx)
228+
close(b.done)
229+
}(bs.activeBatch)
230+
bs.activeBatch = newEmptyBatch()
231+
}
232+
233+
func (bs *batchSender) send(ctx context.Context, req Request) error {
234+
if bs.stopped.Load() {
235+
return errors.New("batchSender is stopped")
236+
}
237+
238+
bs.activeRequestsWG.Add(1)
239+
defer bs.activeRequestsWG.Done()
240+
241+
if bs.mergeSplitFunc != nil {
242+
return bs.sendMergeSplitBatch(ctx, req)
243+
}
244+
return bs.sendMergeBatch(ctx, req)
245+
}
246+
247+
func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error {
248+
bs.mu.Lock()
249+
250+
reqs, err := bs.mergeSplitFunc(ctx, bs.activeBatch.request, req, bs.splitCfg.MaxSizeItems)
251+
if err != nil || len(reqs) == 0 {
252+
bs.mu.Unlock()
253+
return err
254+
}
255+
if len(reqs) == 1 || bs.activeBatch.request != nil {
256+
bs.activeBatch.request = reqs[0]
257+
bs.activeBatch.ctx = ctx
258+
batch := bs.activeBatch
259+
var sent bool
260+
if batch.request.ItemsCount() >= bs.mergeCfg.MinSizeItems || len(reqs) > 1 {
261+
bs.exportActiveBatch()
262+
sent = true
263+
}
264+
bs.mu.Unlock()
265+
if sent {
266+
bs.resetTimerCh <- struct{}{}
267+
}
268+
<-batch.done
269+
if batch.err != nil {
270+
return batch.err
271+
}
272+
reqs = reqs[1:]
273+
} else {
274+
bs.mu.Unlock()
275+
}
276+
277+
// Intentionally do not put the last request in the active batch to not block it.
278+
// TODO: Consider including the partial request in the error to avoid double publishing.
279+
for _, r := range reqs {
280+
if err := r.Export(ctx); err != nil {
281+
return err
282+
}
283+
}
284+
return nil
285+
}
286+
287+
func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
288+
bs.mu.Lock()
289+
290+
if bs.activeBatch.request != nil {
291+
var err error
292+
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
293+
if err != nil {
294+
bs.mu.Unlock()
295+
return err
296+
}
297+
}
298+
bs.activeBatch.request = req
299+
bs.activeBatch.ctx = ctx
300+
batch := bs.activeBatch
301+
var sent bool
302+
if bs.activeBatch.request.ItemsCount() >= bs.mergeCfg.MinSizeItems {
303+
bs.exportActiveBatch()
304+
sent = true
305+
}
306+
bs.mu.Unlock()
307+
if sent {
308+
bs.resetTimerCh <- struct{}{}
309+
}
310+
<-batch.done
311+
return batch.err
312+
}
313+
314+
func (bs *batchSender) Shutdown(context.Context) error {
315+
bs.stopped.Store(true)
316+
close(bs.shutdownCh)
317+
bs.activeRequestsWG.Wait()
318+
return nil
319+
}

0 commit comments

Comments
 (0)