Skip to content

Commit 97f685e

Browse files
authored
[processor/deltatocumulative] limit tracked streams (#31488)
**Description:** Adds a configurable upper limit to the number of tracked streams. This allows to introduce a upper bound to the memory usage. **Testing:** Test case was added **Documentation:** README was updated
1 parent 0672df1 commit 97f685e

File tree

10 files changed

+189
-8
lines changed

10 files changed

+189
-8
lines changed
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "enhancement"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: "deltatocumulativeprocessor"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: introduce configurable stream limit
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [31488]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Adds `max_streams` option that allows to set upper bound (default = unlimited)
20+
to the number of tracked streams. Any additional streams exceeding the limit
21+
are dropped.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

internal/exp/metrics/staleness/staleness.go

+11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ import (
1313
// We override how Now() is returned, so we can have deterministic tests
1414
var NowFunc = time.Now
1515

16+
var (
17+
_ streams.Map[any] = (*Staleness[any])(nil)
18+
_ streams.Evictor = (*Staleness[any])(nil)
19+
)
20+
1621
// Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can
1722
// call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is
1823
// older than the `max`
@@ -82,3 +87,9 @@ func (s *Staleness[T]) Next() time.Time {
8287
_, ts := s.pq.Peek()
8388
return ts
8489
}
90+
91+
func (s *Staleness[T]) Evict() identity.Stream {
92+
id, _ := s.pq.Pop()
93+
s.items.Delete(id)
94+
return id
95+
}

internal/exp/metrics/streams/streams.go

+6
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,9 @@ func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool {
5050
func (m HashMap[T]) Len() int {
5151
return len((map[identity.Stream]T)(m))
5252
}
53+
54+
// Evictors remove the "least important" stream based on some strategy such as
55+
// the oldest, least active, etc.
56+
type Evictor interface {
57+
Evict() identity.Stream
58+
}

processor/deltatocumulativeprocessor/README.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ processors:
2525
deltatocumulative:
2626
# how long until a series not receiving new samples is removed
2727
[ max_stale: <duration> | default = 5m ]
28+
29+
# upper limit of streams to track. new streams exceeding this limit
30+
# will be dropped
31+
[ max_streams: <int> | default = 0 (off) ]
2832
```
29-
30-
There is no further configuration required. All delta samples are converted to cumulative.

processor/deltatocumulativeprocessor/config.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,26 @@ import (
1313
var _ component.ConfigValidator = (*Config)(nil)
1414

1515
type Config struct {
16-
MaxStale time.Duration `json:"max_stale"`
16+
MaxStale time.Duration `json:"max_stale"`
17+
MaxStreams int `json:"max_streams"`
1718
}
1819

1920
func (c *Config) Validate() error {
2021
if c.MaxStale <= 0 {
2122
return fmt.Errorf("max_stale must be a positive duration (got %s)", c.MaxStale)
2223
}
24+
if c.MaxStreams <= 0 {
25+
return fmt.Errorf("max_streams must be a positive number (got %d)", c.MaxStreams)
26+
}
2327
return nil
2428
}
29+
30+
func createDefaultConfig() component.Config {
31+
return &Config{
32+
MaxStale: 5 * time.Minute,
33+
34+
// disable. TODO: find good default
35+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31603
36+
MaxStreams: 0,
37+
}
38+
}

processor/deltatocumulativeprocessor/factory.go

-5
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele
66
import (
77
"context"
88
"fmt"
9-
"time"
109

1110
"go.opentelemetry.io/collector/component"
1211
"go.opentelemetry.io/collector/consumer"
@@ -23,10 +22,6 @@ func NewFactory() processor.Factory {
2322
)
2423
}
2524

26-
func createDefaultConfig() component.Config {
27-
return &Config{MaxStale: 5 * time.Minute}
28-
}
29-
3025
func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, next consumer.Metrics) (processor.Metrics, error) {
3126
pcfg, ok := cfg.(*Config)
3227
if !ok {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
10+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
11+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
12+
)
13+
14+
func Limit[T any](m Map[T], max int) LimitMap[T] {
15+
return LimitMap[T]{Map: m, Max: max}
16+
}
17+
18+
type LimitMap[T any] struct {
19+
Max int
20+
21+
Evictor streams.Evictor
22+
streams.Map[T]
23+
}
24+
25+
func (m LimitMap[T]) Store(id identity.Stream, v T) error {
26+
if m.Map.Len() < m.Max {
27+
return m.Map.Store(id, v)
28+
}
29+
30+
errl := ErrLimit(m.Max)
31+
if m.Evictor != nil {
32+
gone := m.Evictor.Evict()
33+
if err := m.Map.Store(id, v); err != nil {
34+
return err
35+
}
36+
return ErrEvicted{ErrLimit: errl, id: gone}
37+
}
38+
return errl
39+
}
40+
41+
type ErrLimit int
42+
43+
func (e ErrLimit) Error() string {
44+
return fmt.Sprintf("stream limit of %d reached", e)
45+
}
46+
47+
func AtLimit(err error) bool {
48+
var errLimit ErrLimit
49+
return errors.As(err, &errLimit)
50+
}
51+
52+
type ErrEvicted struct {
53+
ErrLimit
54+
id Ident
55+
}
56+
57+
func (e ErrEvicted) Error() string {
58+
return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.id)
59+
}
60+
61+
func (e ErrEvicted) Unwrap() error {
62+
return e.ErrLimit
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package streams_test
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
12+
exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
13+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random"
16+
)
17+
18+
func TestLimit(t *testing.T) {
19+
sum := random.Sum()
20+
21+
items := make(exp.HashMap[data.Number])
22+
lim := streams.Limit(items, 10)
23+
24+
ids := make([]identity.Stream, 10)
25+
26+
// write until limit must work
27+
for i := 0; i < 10; i++ {
28+
id, dp := sum.Stream()
29+
ids[i] = id
30+
err := lim.Store(id, dp)
31+
require.NoError(t, err)
32+
}
33+
34+
// one over limit must be rejected
35+
{
36+
id, dp := sum.Stream()
37+
err := lim.Store(id, dp)
38+
want := streams.ErrLimit(10)
39+
require.ErrorAs(t, err, &want)
40+
require.True(t, streams.AtLimit(err))
41+
}
42+
43+
// after removing one, must be accepted again
44+
{
45+
lim.Delete(ids[0])
46+
47+
id, dp := sum.Stream()
48+
err := lim.Store(id, dp)
49+
require.NoError(t, err)
50+
}
51+
}

processor/deltatocumulativeprocessor/internal/streams/streams.go

+2
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,5 @@ func (a MapAggr[D]) Aggregate(id Ident, dp D) (D, error) {
3333
v, _ := a.Map.Load(id)
3434
return v, err
3535
}
36+
37+
type Evictor = streams.Evictor

processor/deltatocumulativeprocessor/processor.go

+7
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processo
5555
proc.stale = stale
5656
dps = stale
5757
}
58+
if cfg.MaxStreams > 0 {
59+
lim := streams.Limit(dps, cfg.MaxStreams)
60+
if proc.exp != nil {
61+
lim.Evictor = proc.exp
62+
}
63+
dps = lim
64+
}
5865

5966
proc.aggr = streams.IntoAggregator(dps)
6067
return &proc

0 commit comments

Comments
 (0)