Skip to content

Commit 655bfa7

Browse files
authored
[processor/deltatocumulative]: observe accumulation metrics (#31363)
**Description:** <Describe what has changed.> <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> Uses the otel meter to observe some key metrics about the accumulation: | Name | Description | |------------------------------------------|---------------------------------------------------------------------------------------| | `streams_count` | Number of streams currently tracked by the aggregation state | | `datapoints_processed` | Total number of datapoints processed, whether successful or not | | `datapoints_dropped` | Faulty datapoints that were dropped due to the reason given in the `reason` attribute | | `seconds_lost` | Total length of all gaps in the streams, which occur e.g. due to lost in transit | **Link to tracking Issue:** #30705 **Testing:** None **Documentation:** Readme updated
1 parent 13b2b03 commit 655bfa7

File tree

13 files changed

+565
-19
lines changed

13 files changed

+565
-19
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "enhancement"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: deltatocumulativeprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: self-instrumentation to observe key metrics of the stream accumulation
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [30705]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

processor/deltatocumulativeprocessor/README.md

+17
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,21 @@ processors:
2929
# upper limit of streams to track. new streams exceeding this limit
3030
# will be dropped
3131
[ max_streams: <int> | default = 0 (off) ]
32+
3233
```
34+
35+
There is no further configuration required. All delta samples are converted to cumulative.
36+
37+
## Troubleshooting
38+
39+
The following metrics are recorded when [telemetry is
40+
enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry):
41+
42+
| Name | Description | Unit |
43+
|------------------------------------------|---------------------------------------------------------------------------------------|---------------|
44+
| `deltatocumulative.streams.tracked` | Number of streams currently tracked by the aggregation state | `{stream}` |
45+
| `deltatocumulative.streams.limit` | Upper limit of tracked streams | `{stream}` |
46+
| `deltatocumulative.streams.evicted` | Number of streams removed from tracking to ingest newer streams | `{stream}` |
47+
| `deltatocumulative.datapoints.processed` | Total number of datapoints processed, whether successful or not | `{datapoint}` |
48+
| `deltatocumulative.datapoints.dropped` | Faulty datapoints that were dropped due to the reason given in the `reason` attribute | `{datapoint}` |
49+
| `deltatocumulative.gaps.length` | Total length of all gaps in the streams, which occur e.g. due to lost in transit | `second` |

processor/deltatocumulativeprocessor/factory.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,6 @@ func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg
2828
return nil, fmt.Errorf("configuration parsing error")
2929
}
3030

31-
return newProcessor(pcfg, set.Logger, next), nil
31+
meter := metadata.Meter(set.TelemetrySettings)
32+
return newProcessor(pcfg, set.Logger, meter, next), nil
3233
}

processor/deltatocumulativeprocessor/go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
go.opentelemetry.io/collector/consumer v0.97.1-0.20240409140257-792fac1b62d4
1111
go.opentelemetry.io/collector/pdata v1.4.1-0.20240409140257-792fac1b62d4
1212
go.opentelemetry.io/collector/processor v0.97.1-0.20240409140257-792fac1b62d4
13+
go.opentelemetry.io/otel v1.24.0
1314
go.opentelemetry.io/otel/metric v1.24.0
1415
go.opentelemetry.io/otel/trace v1.24.0
1516
go.uber.org/zap v1.27.0
@@ -39,9 +40,9 @@ require (
3940
github.com/prometheus/client_model v0.6.1 // indirect
4041
github.com/prometheus/common v0.48.0 // indirect
4142
github.com/prometheus/procfs v0.12.0 // indirect
43+
go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4 // indirect
4244
go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240409140257-792fac1b62d4 // indirect
4345
go.opentelemetry.io/collector/pdata/testdata v0.0.0-20240408153657-fc289290613a // indirect
44-
go.opentelemetry.io/otel v1.24.0 // indirect
4546
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
4647
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
4748
go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect

processor/deltatocumulativeprocessor/go.sum

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/deltatocumulativeprocessor/internal/delta/delta.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,17 @@ func (a Accumulator[D]) Store(id streams.Ident, dp D) error {
4444
return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()}
4545
}
4646

47+
// detect gaps
48+
var gap error
49+
if dp.StartTimestamp() > aggr.Timestamp() {
50+
gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()}
51+
}
52+
4753
res := aggr.Add(dp)
48-
return a.Map.Store(id, res)
54+
if err := a.Map.Store(id, res); err != nil {
55+
return err
56+
}
57+
return gap
4958
}
5059

5160
type ErrOlderStart struct {
@@ -65,3 +74,11 @@ type ErrOutOfOrder struct {
6574
func (e ErrOutOfOrder) Error() string {
6675
return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last)
6776
}
77+
78+
type ErrGap struct {
79+
From, To pcommon.Timestamp
80+
}
81+
82+
func (e ErrGap) Error() string {
83+
return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To)
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// maybe provides utilities for representing data may or may not exist at
5+
// runtime in a safe way.
6+
//
7+
// A typical approach to this are pointers, but they suffer from two issues:
8+
// - Unsafety: permitting nil pointers must require careful checking on each use,
9+
// which is easily forgotten
10+
// - Blindness: nil itself does cannot differentiate between "set to nil" and
11+
// "not set all", leading to unexepcted edge cases
12+
//
13+
// The [Ptr] type of this package provides a safe alternative with a clear
14+
// distinction between "not set" and "set to nil".
15+
package maybe // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe"
16+
17+
// Ptr references some value of type T that is not guaranteed to exist.
18+
// Callers must use [Ptr.Try] to access the underlying value, checking the
19+
// ok return value too.
20+
// This provides a clear distinction between "not set" and "set to nil".
21+
//
22+
// Use [Some] and [None] to create Ptrs.
23+
type Ptr[T any] struct {
24+
to *T
25+
ok bool
26+
}
27+
28+
// None returns a Ptr that represents "not-set".
29+
// This is equal to a zero-value Ptr.
30+
func None[T any]() Ptr[T] {
31+
return Ptr[T]{to: nil, ok: false}
32+
}
33+
34+
// Some returns a pointer to the passed T.
35+
//
36+
// The ptr argument may be nil, in which case this represents "explicitly set to
37+
// nil".
38+
func Some[T any](ptr *T) Ptr[T] {
39+
return Ptr[T]{to: ptr, ok: true}
40+
}
41+
42+
// Try attempts to de-reference the Ptr, giving one of three results:
43+
//
44+
// - nil, false: not-set
45+
// - nil, true: explicitly set to nil
46+
// - non-nil, true: set to some value
47+
//
48+
// This provides extra safety over bare pointers, because callers are forced by
49+
// the compiler to either check or explicitly ignore the ok value.
50+
func (ptr Ptr[T]) Try() (_ *T, ok bool) {
51+
return ptr.to, ptr.ok
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package maybe_test
5+
6+
import (
7+
"fmt"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe"
13+
)
14+
15+
func TestMaybe(t *testing.T) {
16+
t.Run("zero-not-ok", func(t *testing.T) {
17+
var ptr maybe.Ptr[int]
18+
_, ok := ptr.Try()
19+
require.False(t, ok)
20+
})
21+
t.Run("none-not-ok", func(t *testing.T) {
22+
ptr := maybe.None[int]()
23+
_, ok := ptr.Try()
24+
require.False(t, ok)
25+
})
26+
t.Run("explicit-nil", func(t *testing.T) {
27+
ptr := maybe.Some[int](nil)
28+
v, ok := ptr.Try()
29+
require.Nil(t, v)
30+
require.True(t, ok)
31+
})
32+
t.Run("value", func(t *testing.T) {
33+
num := 42
34+
ptr := maybe.Some(&num)
35+
v, ok := ptr.Try()
36+
require.True(t, ok)
37+
require.Equal(t, num, *v)
38+
})
39+
}
40+
41+
func ExamplePtr() {
42+
var unset maybe.Ptr[int] // = maybe.None()
43+
if v, ok := unset.Try(); ok {
44+
fmt.Println("unset:", v)
45+
} else {
46+
fmt.Println("unset: !ok")
47+
}
48+
49+
var xnil maybe.Ptr[int] = maybe.Some[int](nil)
50+
if v, ok := xnil.Try(); ok {
51+
fmt.Println("explicit nil:", v)
52+
}
53+
54+
num := 42
55+
var set maybe.Ptr[int] = maybe.Some(&num)
56+
if v, ok := set.Try(); ok {
57+
fmt.Println("set:", *v)
58+
}
59+
60+
// Output:
61+
// unset: !ok
62+
// explicit nil: <nil>
63+
// set: 42
64+
}

processor/deltatocumulativeprocessor/internal/streams/limit.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ type LimitMap[T any] struct {
2323
}
2424

2525
func (m LimitMap[T]) Store(id identity.Stream, v T) error {
26-
if m.Map.Len() < m.Max {
26+
_, ok := m.Map.Load(id)
27+
avail := m.Map.Len() < m.Max
28+
if ok || avail {
2729
return m.Map.Store(id, v)
2830
}
2931

@@ -33,7 +35,7 @@ func (m LimitMap[T]) Store(id identity.Stream, v T) error {
3335
if err := m.Map.Store(id, v); err != nil {
3436
return err
3537
}
36-
return ErrEvicted{ErrLimit: errl, id: gone}
38+
return ErrEvicted{ErrLimit: errl, Ident: gone}
3739
}
3840
return errl
3941
}
@@ -51,13 +53,9 @@ func AtLimit(err error) bool {
5153

5254
type ErrEvicted struct {
5355
ErrLimit
54-
id Ident
56+
Ident Ident
5557
}
5658

5759
func (e ErrEvicted) Error() string {
58-
return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.id)
59-
}
60-
61-
func (e ErrEvicted) Unwrap() error {
62-
return e.ErrLimit
60+
return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.Ident)
6361
}

processor/deltatocumulativeprocessor/internal/streams/limit_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ func TestLimit(t *testing.T) {
2222
lim := streams.Limit(items, 10)
2323

2424
ids := make([]identity.Stream, 10)
25+
dps := make([]data.Number, 10)
2526

2627
// write until limit must work
2728
for i := 0; i < 10; i++ {
2829
id, dp := sum.Stream()
2930
ids[i] = id
31+
dps[i] = dp
3032
err := lim.Store(id, dp)
3133
require.NoError(t, err)
3234
}
@@ -40,6 +42,12 @@ func TestLimit(t *testing.T) {
4042
require.True(t, streams.AtLimit(err))
4143
}
4244

45+
// write to existing must work
46+
{
47+
err := lim.Store(ids[3], dps[3])
48+
require.NoError(t, err)
49+
}
50+
4351
// after removing one, must be accepted again
4452
{
4553
lim.Delete(ids[0])

0 commit comments

Comments
 (0)