From 554ba61644559c09c296fa5ac7cca77b07137d25 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 5 Mar 2024 17:02:08 +0100 Subject: [PATCH 01/18] deltatocumulative: metrics --- .../deltatocumulativeprocessor/config.go | 2 +- .../deltatocumulativeprocessor/factory.go | 3 +- .../internal/delta/delta.go | 19 ++- .../internal/telemetry/metrics.go | 135 ++++++++++++++++++ .../deltatocumulativeprocessor/processor.go | 5 +- 5 files changed, 160 insertions(+), 4 deletions(-) create mode 100644 processor/deltatocumulativeprocessor/internal/telemetry/metrics.go diff --git a/processor/deltatocumulativeprocessor/config.go b/processor/deltatocumulativeprocessor/config.go index b5744a9779b71..d33f357dba7a7 100644 --- a/processor/deltatocumulativeprocessor/config.go +++ b/processor/deltatocumulativeprocessor/config.go @@ -13,7 +13,7 @@ import ( var _ component.ConfigValidator = (*Config)(nil) type Config struct { - MaxStale time.Duration `json:"max_stale"` + MaxStale time.Duration `mapstructure:"max_stale"` } func (c *Config) Validate() error { diff --git a/processor/deltatocumulativeprocessor/factory.go b/processor/deltatocumulativeprocessor/factory.go index b2fba4e00fc2a..4366a3119575b 100644 --- a/processor/deltatocumulativeprocessor/factory.go +++ b/processor/deltatocumulativeprocessor/factory.go @@ -33,5 +33,6 @@ func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg return nil, fmt.Errorf("configuration parsing error") } - return newProcessor(pcfg, set.Logger, next), nil + meter := metadata.Meter(set.TelemetrySettings) + return newProcessor(pcfg, set.Logger, meter, next), nil } diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index fac787e9ece46..a747d61419a1c 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -44,8 +44,17 @@ func (a Accumulator[D]) Store(id streams.Ident, dp D) error { return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()} } + // detect gaps + var gap error + if dp.StartTimestamp() > aggr.Timestamp() { + gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()} + } + res := aggr.Add(dp) - return a.Map.Store(id, res) + if err := a.Map.Store(id, res); err != nil { + return err + } + return gap } type ErrOlderStart struct { @@ -65,3 +74,11 @@ type ErrOutOfOrder struct { func (e ErrOutOfOrder) Error() string { return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last) } + +type ErrGap struct { + From, To pcommon.Timestamp +} + +func (e ErrGap) Error() string { + return fmt.Sprintf("gap in stream from %s to %s", e.From, e.To) +} diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go new file mode 100644 index 0000000000000..0404363a97334 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -0,0 +1,135 @@ +package telemetry + +import ( + "context" + "errors" + "reflect" + "strings" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type Metrics struct { + streams metric.Int64UpDownCounter + total metric.Int64Counter + dropped metric.Int64Counter + lost metric.Int64Counter +} + +func metrics(meter metric.Meter) Metrics { + var ( + count = use(meter.Int64Counter) + updown = use(meter.Int64UpDownCounter) + ) + + return Metrics{ + streams: updown("streams", + metric.WithDescription("number of streams tracked"), + metric.WithUnit("{stream}"), + ), + total: count("datapoints.processed", + metric.WithDescription("number of datapoints processed"), + metric.WithUnit("{datapoint}"), + ), + dropped: count("datapoints.dropped", + metric.WithDescription("number of dropped datapoints due to given 'reason'"), + metric.WithUnit("{datapoint}"), + ), + lost: count("lost", + metric.WithDescription("total duration where data was expected but not received"), + metric.WithUnit("s"), + ), + } +} + +func Observe[T any](items streams.Map[T], meter metric.Meter) streams.Map[T] { + return &Map[T]{ + Map: items, + Metrics: metrics(meter), + } +} + +var _ streams.Map[any] = (*Map[any])(nil) + +type Map[T any] struct { + streams.Map[T] + + Metrics +} + +func (m *Map[T]) Store(id streams.Ident, v T) error { + inc(m.total) + _, old := m.Load(id) + + var ( + olderStart *delta.ErrOlderStart + outOfOrder *delta.ErrOutOfOrder + gap *delta.ErrGap + ) + + err := m.Map.Store(id, v) + switch { + case err == nil: + // all good + case errors.As(err, olderStart): + // non fatal. record but ignore + inc(m.dropped, reason(olderStart)) + err = nil + case errors.As(err, outOfOrder): + // non fatal. record but ignore + inc(m.dropped, reason(outOfOrder)) + err = nil + case errors.As(err, gap): + // a gap occured. record its length, but ignore + from := gap.From.AsTime() + to := gap.To.AsTime() + lost := to.Sub(from).Seconds() + m.lost.Add(nil, int64(lost)) + err = nil + } + + // not dropped and not seen before => new stream + if err == nil && !old { + inc(m.streams) + } + return err +} + +func (m *Map[T]) Delete(id streams.Ident) { + dec(m.streams) + m.Map.Delete(id) +} + +type addable[Opts any] interface { + Add(context.Context, int64, ...Opts) +} + +func inc[A addable[O], O any](a A, opts ...O) { + a.Add(nil, 1, opts...) +} + +func dec[A addable[O], O any](a A, opts ...O) { + a.Add(nil, -1, opts...) +} + +func reason[E error](err *E) metric.AddOption { + reason := reflect.TypeOf(*new(E)).Name() + reason = strings.TrimPrefix(reason, "Err") + return metric.WithAttributes(attribute.String("reason", reason)) +} + +func use[F func(string, ...O) (M, error), M any, O any](f F) func(string, ...O) M { + return func(name string, opts ...O) M { + name = processorhelper.BuildCustomMetricName(metadata.Type.String(), name) + m, err := f(name, opts...) + if err != nil { + panic(err) + } + return m + } +} diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 8f5941ce84b1f..842f9bc0c8c62 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -12,12 +12,14 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" ) var _ processor.Metrics = (*Processor)(nil) @@ -35,7 +37,7 @@ type Processor struct { mtx sync.Mutex } -func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processor { +func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consumer.Metrics) *Processor { ctx, cancel := context.WithCancel(context.Background()) proc := Processor{ @@ -47,6 +49,7 @@ func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processo var dps streams.Map[data.Number] dps = delta.New[data.Number]() + dps = telemetry.Observe(dps, meter) if cfg.MaxStale > 0 { exp := streams.ExpireAfter(dps, cfg.MaxStale) From 26e5ec06e1939a6905f56f652db206d1ce9febd7 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 5 Mar 2024 17:09:28 +0100 Subject: [PATCH 02/18] deltatocumulative: metrics --- .../deltatocumulativeprocessor/README.md | 12 ++++++ .../internal/telemetry/metrics.go | 43 ++++++++++--------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/processor/deltatocumulativeprocessor/README.md b/processor/deltatocumulativeprocessor/README.md index 1a639128fca86..6878bd5b2afae 100644 --- a/processor/deltatocumulativeprocessor/README.md +++ b/processor/deltatocumulativeprocessor/README.md @@ -28,3 +28,15 @@ processors: ``` There is no further configuration required. All delta samples are converted to cumulative. + +## Troubleshooting + +The following metrics are recorded when [telemetry is +enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry): + +| Name | Description | Unit | +|------------------------------------------|---------------------------------------------------------------------------------------|---------| +| `deltatocumulative_streams_count` | Number of streams currently tracked by the aggregation state | | +| `deltatocumulative_datapoints_processed` | Total number of datapoints processed, whether successful or not | | +| `deltatocumulative_datapoints_dropped` | Faulty datapoints that were dropped due to the reason given in the `reason` attribute | | +| `deltatocumulative_seconds_lost` | Total length of all gaps in the streams, which occur e.g. due to lost in transit | seconds | diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index 0404363a97334..d4bff27f32a70 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package telemetry import ( @@ -6,19 +9,20 @@ import ( "reflect" "strings" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" "go.opentelemetry.io/collector/processor/processorhelper" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) type Metrics struct { streams metric.Int64UpDownCounter total metric.Int64Counter dropped metric.Int64Counter - lost metric.Int64Counter + gaps metric.Int64Counter } func metrics(meter metric.Meter) Metrics { @@ -28,7 +32,7 @@ func metrics(meter metric.Meter) Metrics { ) return Metrics{ - streams: updown("streams", + streams: updown("streams.count", metric.WithDescription("number of streams tracked"), metric.WithUnit("{stream}"), ), @@ -40,7 +44,7 @@ func metrics(meter metric.Meter) Metrics { metric.WithDescription("number of dropped datapoints due to given 'reason'"), metric.WithUnit("{datapoint}"), ), - lost: count("lost", + gaps: count("gaps.length", metric.WithDescription("total duration where data was expected but not received"), metric.WithUnit("s"), ), @@ -58,7 +62,6 @@ var _ streams.Map[any] = (*Map[any])(nil) type Map[T any] struct { streams.Map[T] - Metrics } @@ -67,29 +70,29 @@ func (m *Map[T]) Store(id streams.Ident, v T) error { _, old := m.Load(id) var ( - olderStart *delta.ErrOlderStart - outOfOrder *delta.ErrOutOfOrder - gap *delta.ErrGap + olderStart delta.ErrOlderStart + outOfOrder delta.ErrOutOfOrder + gap delta.ErrGap ) err := m.Map.Store(id, v) switch { case err == nil: // all good - case errors.As(err, olderStart): + case errors.As(err, &olderStart): // non fatal. record but ignore - inc(m.dropped, reason(olderStart)) + inc(m.dropped, reason(&olderStart)) err = nil - case errors.As(err, outOfOrder): + case errors.As(err, &outOfOrder): // non fatal. record but ignore - inc(m.dropped, reason(outOfOrder)) + inc(m.dropped, reason(&outOfOrder)) err = nil - case errors.As(err, gap): - // a gap occured. record its length, but ignore + case errors.As(err, &gap): + // a gap occurred. record its length, but ignore from := gap.From.AsTime() to := gap.To.AsTime() lost := to.Sub(from).Seconds() - m.lost.Add(nil, int64(lost)) + m.gaps.Add(context.TODO(), int64(lost)) err = nil } @@ -110,14 +113,14 @@ type addable[Opts any] interface { } func inc[A addable[O], O any](a A, opts ...O) { - a.Add(nil, 1, opts...) + a.Add(context.TODO(), 1, opts...) } func dec[A addable[O], O any](a A, opts ...O) { - a.Add(nil, -1, opts...) + a.Add(context.TODO(), -1, opts...) } -func reason[E error](err *E) metric.AddOption { +func reason[E error](_ *E) metric.AddOption { reason := reflect.TypeOf(*new(E)).Name() reason = strings.TrimPrefix(reason, "Err") return metric.WithAttributes(attribute.String("reason", reason)) From 8a75942bd1a974508002c8094f0709e2868f9140 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Wed, 21 Feb 2024 15:57:43 +0100 Subject: [PATCH 03/18] *: changelog --- .chloggen/deltatocumulative-metrics.yaml | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/deltatocumulative-metrics.yaml diff --git a/.chloggen/deltatocumulative-metrics.yaml b/.chloggen/deltatocumulative-metrics.yaml new file mode 100644 index 0000000000000..df001fda6a663 --- /dev/null +++ b/.chloggen/deltatocumulative-metrics.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatocumulativeprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: self-instrumentation to observe key metrics of the stream accumulation + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30705] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 9346ce5b8a434ac4f9c5a80aaec94ca2ace7d801 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 5 Mar 2024 17:38:33 +0100 Subject: [PATCH 04/18] *: goporto --- .../deltatocumulativeprocessor/internal/telemetry/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index d4bff27f32a70..f89026e1abcb0 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package telemetry +package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" import ( "context" From a23bbb37a6f0cc1f6b782bcd55301e8cd6c63ccf Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 5 Mar 2024 19:36:17 +0100 Subject: [PATCH 05/18] *: go mod tidy --- processor/deltatocumulativeprocessor/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index c7fc08ab881a2..cff2977e6ca3b 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -10,6 +10,7 @@ require ( go.opentelemetry.io/collector/consumer v0.96.0 go.opentelemetry.io/collector/pdata v1.3.0 go.opentelemetry.io/collector/processor v0.96.0 + go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/zap v1.27.0 @@ -40,7 +41,6 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect go.opentelemetry.io/collector v0.96.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.96.0 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect From e5aecc6f1086acef2d7cb91791675e0d92e3f70e Mon Sep 17 00:00:00 2001 From: sh0rez Date: Wed, 6 Mar 2024 15:07:42 +0100 Subject: [PATCH 06/18] *: s/TODO/Background --- .../deltatocumulativeprocessor/internal/telemetry/metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index f89026e1abcb0..8f699eebc64ff 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -113,11 +113,11 @@ type addable[Opts any] interface { } func inc[A addable[O], O any](a A, opts ...O) { - a.Add(context.TODO(), 1, opts...) + a.Add(context.Background(), 1, opts...) } func dec[A addable[O], O any](a A, opts ...O) { - a.Add(context.TODO(), -1, opts...) + a.Add(context.Background(), -1, opts...) } func reason[E error](_ *E) metric.AddOption { From ef22fb4354eb6c9cd2ec2d9e41e0e27593dc227f Mon Sep 17 00:00:00 2001 From: sh0rez Date: Mon, 11 Mar 2024 15:21:16 +0100 Subject: [PATCH 07/18] maybe: optional pointers --- .../internal/maybe/option.go | 22 +++++++++++++++++++ .../deltatocumulativeprocessor/processor.go | 16 ++++++++------ 2 files changed, 31 insertions(+), 7 deletions(-) create mode 100644 processor/deltatocumulativeprocessor/internal/maybe/option.go diff --git a/processor/deltatocumulativeprocessor/internal/maybe/option.go b/processor/deltatocumulativeprocessor/internal/maybe/option.go new file mode 100644 index 0000000000000..f2b25ab660ea8 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/maybe/option.go @@ -0,0 +1,22 @@ +package maybe + +// Ptr is a pointer that points to something that is not guaranteed to exist. +// Any use must "Try()" accessing the underlying value, enforcing checking the +// ok return value too. +// This provides a clear distinction between "not set" and "set to nil" +type Ptr[T any] struct { + to *T + ok bool +} + +func None[T any]() Ptr[T] { + return Ptr[T]{to: nil, ok: false} +} + +func Some[T any](ptr *T) Ptr[T] { + return Ptr[T]{to: ptr, ok: true} +} + +func (ptr Ptr[T]) Try() (_ *T, ok bool) { + return ptr.to, ptr.ok +} diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 1b89434ce7562..eb91ddefda857 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -19,6 +19,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" @@ -34,7 +35,7 @@ type Processor struct { cancel context.CancelFunc aggr streams.Aggregator[data.Number] - stale *staleness.Staleness[data.Number] + stale maybe.Ptr[staleness.Staleness[data.Number]] mtx sync.Mutex } @@ -54,14 +55,14 @@ func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consume dps = telemetry.Observe(dps, meter) if cfg.MaxStale > 0 { - stale := staleness.NewStaleness(cfg.MaxStale, dps) + stale := maybe.Some(staleness.NewStaleness(cfg.MaxStale, dps)) proc.stale = stale - dps = stale + dps, _ = stale.Try() } if cfg.MaxStreams > 0 { lim := streams.Limit(dps, cfg.MaxStreams) - if proc.stale != nil { - lim.Evictor = proc.stale + if stale, ok := proc.stale.Try(); ok { + lim.Evictor = stale } dps = lim } @@ -71,7 +72,8 @@ func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consume } func (p *Processor) Start(_ context.Context, _ component.Host) error { - if p.stale == nil { + stale, ok := p.stale.Try() + if !ok { return nil } @@ -83,7 +85,7 @@ func (p *Processor) Start(_ context.Context, _ component.Host) error { return case <-tick.C: p.mtx.Lock() - p.stale.ExpireOldEntries() + stale.ExpireOldEntries() p.mtx.Unlock() } } From 0d49f76bfbaf5aa335abc201694146e60fdb9049 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Mon, 11 Mar 2024 16:05:31 +0100 Subject: [PATCH 08/18] *: limit metrics --- .../internal/streams/limit.go | 4 - .../internal/telemetry/metrics.go | 91 +++++++++++++------ .../deltatocumulativeprocessor/processor.go | 5 +- 3 files changed, 69 insertions(+), 31 deletions(-) diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go index 0378960ee6573..52cbd322cb9e6 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit.go @@ -57,7 +57,3 @@ type ErrEvicted struct { func (e ErrEvicted) Error() string { return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.id) } - -func (e ErrEvicted) Unwrap() error { - return e.ErrLimit -} diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index 8f699eebc64ff..bbdd01d37520c 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -6,8 +6,6 @@ package telemetry // import "github.com/open-telemetry/opentelemetry-collector-c import ( "context" "errors" - "reflect" - "strings" "go.opentelemetry.io/collector/processor/processorhelper" "go.opentelemetry.io/otel/attribute" @@ -18,32 +16,56 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) -type Metrics struct { - streams metric.Int64UpDownCounter +type Streams struct { + tracked metric.Int64UpDownCounter + limit metric.Int64ObservableGauge + evicted metric.Int64Counter +} + +type Datapoints struct { total metric.Int64Counter dropped metric.Int64Counter - gaps metric.Int64Counter +} + +type Metrics struct { + streams Streams + dps Datapoints + + gaps metric.Int64Counter } func metrics(meter metric.Meter) Metrics { var ( count = use(meter.Int64Counter) updown = use(meter.Int64UpDownCounter) + gauge = use(meter.Int64ObservableGauge) ) return Metrics{ - streams: updown("streams.count", - metric.WithDescription("number of streams tracked"), - metric.WithUnit("{stream}"), - ), - total: count("datapoints.processed", - metric.WithDescription("number of datapoints processed"), - metric.WithUnit("{datapoint}"), - ), - dropped: count("datapoints.dropped", - metric.WithDescription("number of dropped datapoints due to given 'reason'"), - metric.WithUnit("{datapoint}"), - ), + streams: Streams{ + tracked: updown("streams.tracked", + metric.WithDescription("number of streams tracked"), + metric.WithUnit("{stream}"), + ), + limit: gauge("streams.limit", + metric.WithDescription("upper limit of tracked streams"), + metric.WithUnit("{stream}"), + ), + evicted: count("streams.evicted", + metric.WithDescription("number of streams evicted"), + metric.WithUnit("{stream}"), + ), + }, + dps: Datapoints{ + total: count("datapoints.processed", + metric.WithDescription("number of datapoints processed"), + metric.WithUnit("{datapoint}"), + ), + dropped: count("datapoints.dropped", + metric.WithDescription("number of dropped datapoints due to given 'reason'"), + metric.WithUnit("{datapoint}"), + ), + }, gaps: count("gaps.length", metric.WithDescription("total duration where data was expected but not received"), metric.WithUnit("s"), @@ -51,7 +73,18 @@ func metrics(meter metric.Meter) Metrics { } } -func Observe[T any](items streams.Map[T], meter metric.Meter) streams.Map[T] { +func (m Metrics) WithLimit(meter metric.Meter, max int64) { + then := metric.Callback(func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(m.streams.limit, max) + return nil + }) + _, err := meter.RegisterCallback(then, m.streams.limit) + if err != nil { + panic(err) + } +} + +func Observe[T any](items streams.Map[T], meter metric.Meter) *Map[T] { return &Map[T]{ Map: items, Metrics: metrics(meter), @@ -66,13 +99,15 @@ type Map[T any] struct { } func (m *Map[T]) Store(id streams.Ident, v T) error { - inc(m.total) + inc(m.dps.total) _, old := m.Load(id) var ( olderStart delta.ErrOlderStart outOfOrder delta.ErrOutOfOrder gap delta.ErrGap + limit streams.ErrLimit + evict streams.ErrEvicted ) err := m.Map.Store(id, v) @@ -81,11 +116,17 @@ func (m *Map[T]) Store(id streams.Ident, v T) error { // all good case errors.As(err, &olderStart): // non fatal. record but ignore - inc(m.dropped, reason(&olderStart)) + inc(m.dps.dropped, reason("older-start")) err = nil case errors.As(err, &outOfOrder): // non fatal. record but ignore - inc(m.dropped, reason(&outOfOrder)) + inc(m.dps.dropped, reason("out-of-order")) + err = nil + case errors.As(err, &evict): + inc(m.streams.evicted) + err = nil + case errors.As(err, &limit): + inc(m.dps.dropped, reason("stream-limit")) err = nil case errors.As(err, &gap): // a gap occurred. record its length, but ignore @@ -98,13 +139,13 @@ func (m *Map[T]) Store(id streams.Ident, v T) error { // not dropped and not seen before => new stream if err == nil && !old { - inc(m.streams) + inc(m.streams.tracked) } return err } func (m *Map[T]) Delete(id streams.Ident) { - dec(m.streams) + dec(m.streams.tracked) m.Map.Delete(id) } @@ -120,9 +161,7 @@ func dec[A addable[O], O any](a A, opts ...O) { a.Add(context.Background(), -1, opts...) } -func reason[E error](_ *E) metric.AddOption { - reason := reflect.TypeOf(*new(E)).Name() - reason = strings.TrimPrefix(reason, "Err") +func reason(reason string) metric.AddOption { return metric.WithAttributes(attribute.String("reason", reason)) } diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index eb91ddefda857..6c7d1f69ea77f 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -52,7 +52,6 @@ func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consume var dps streams.Map[data.Number] dps = delta.New[data.Number]() - dps = telemetry.Observe(dps, meter) if cfg.MaxStale > 0 { stale := maybe.Some(staleness.NewStaleness(cfg.MaxStale, dps)) @@ -67,6 +66,10 @@ func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consume dps = lim } + tel := telemetry.Observe(dps, meter) + tel.WithLimit(meter, int64(cfg.MaxStreams)) + dps = tel + proc.aggr = streams.IntoAggregator(dps) return &proc } From 103f5f37a9a0bd21eb9be4769c7b51dbaa1cc150 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Mon, 11 Mar 2024 16:15:08 +0100 Subject: [PATCH 09/18] *: update readme --- processor/deltatocumulativeprocessor/README.md | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/processor/deltatocumulativeprocessor/README.md b/processor/deltatocumulativeprocessor/README.md index 5873eef45dcda..fe281431ba7ca 100644 --- a/processor/deltatocumulativeprocessor/README.md +++ b/processor/deltatocumulativeprocessor/README.md @@ -30,6 +30,8 @@ processors: # will be dropped [ max_streams: | default = 0 (off) ] +``` + There is no further configuration required. All delta samples are converted to cumulative. ## Troubleshooting @@ -37,9 +39,11 @@ There is no further configuration required. All delta samples are converted to c The following metrics are recorded when [telemetry is enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry): -| Name | Description | Unit | -|------------------------------------------|---------------------------------------------------------------------------------------|---------| -| `deltatocumulative_streams_count` | Number of streams currently tracked by the aggregation state | | -| `deltatocumulative_datapoints_processed` | Total number of datapoints processed, whether successful or not | | -| `deltatocumulative_datapoints_dropped` | Faulty datapoints that were dropped due to the reason given in the `reason` attribute | | -| `deltatocumulative_seconds_lost` | Total length of all gaps in the streams, which occur e.g. due to lost in transit | seconds | +| Name | Description | Unit | +|------------------------------------------|---------------------------------------------------------------------------------------|---------------| +| `deltatocumulative.streams.tracked` | Number of streams currently tracked by the aggregation state | `{stream}` | +| `deltatocumulative.streams.limit` | Upper limit of tracked streams | `{stream}` | +| `deltatocumulative.streams.evicted` | Number of streams removed from tracking to ingest newer streams | `{stream}` | +| `deltatocumulative.datapoints.processed` | Total number of datapoints processed, whether successful or not | `{datapoint}` | +| `deltatocumulative.datapoints.dropped` | Faulty datapoints that were dropped due to the reason given in the `reason` attribute | `{datapoint}` | +| `deltatocumulative.gaps.length` | Total length of all gaps in the streams, which occur e.g. due to lost in transit | `second` | From 422c0f4dd4c2c4b8484d58eefcd9cf00dbc6aa53 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Mon, 18 Mar 2024 11:06:26 +0100 Subject: [PATCH 10/18] telemetry: split items and errs These need to sit at different stages in the pipeline --- .../internal/streams/limit.go | 2 +- .../internal/telemetry/metrics.go | 92 ++++++++++++------- .../deltatocumulativeprocessor/processor.go | 8 +- 3 files changed, 63 insertions(+), 39 deletions(-) diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go index 52cbd322cb9e6..406b8f41291bb 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit.go @@ -23,7 +23,7 @@ type LimitMap[T any] struct { } func (m LimitMap[T]) Store(id identity.Stream, v T) error { - if m.Map.Len() < m.Max { + if m.Map.Len() <= m.Max { return m.Map.Store(id, v) } diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index bbdd01d37520c..869a94d740460 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -16,6 +16,16 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) +type Telemetry struct { + Metrics +} + +func New(meter metric.Meter) Telemetry { + return Telemetry{ + Metrics: metrics(meter), + } +} + type Streams struct { tracked metric.Int64UpDownCounter limit metric.Int64ObservableGauge @@ -84,24 +94,48 @@ func (m Metrics) WithLimit(meter metric.Meter, max int64) { } } -func Observe[T any](items streams.Map[T], meter metric.Meter) *Map[T] { - return &Map[T]{ +func ObserveItems[T any](items streams.Map[T], metrics *Metrics) Items[T] { + return Items[T]{ Map: items, - Metrics: metrics(meter), + Metrics: metrics, } } -var _ streams.Map[any] = (*Map[any])(nil) +func ObserveNonFatal[T any](items streams.Map[T], metrics *Metrics) Faults[T] { + return Faults[T]{ + Map: items, + Metrics: metrics, + } +} -type Map[T any] struct { +type Items[T any] struct { streams.Map[T] - Metrics + *Metrics +} + +func (i Items[T]) Store(id streams.Ident, v T) error { + inc(i.dps.total) + + _, old := i.Map.Load(id) + err := i.Map.Store(id, v) + if err == nil && !old { + inc(i.streams.tracked) + } + + return err } -func (m *Map[T]) Store(id streams.Ident, v T) error { - inc(m.dps.total) - _, old := m.Load(id) +func (i Items[T]) Delete(id streams.Ident) { + dec(i.streams.tracked) + i.Map.Delete(id) +} + +type Faults[T any] struct { + streams.Map[T] + *Metrics +} +func (f Faults[T]) Store(id streams.Ident, v T) error { var ( olderStart delta.ErrOlderStart outOfOrder delta.ErrOutOfOrder @@ -110,44 +144,32 @@ func (m *Map[T]) Store(id streams.Ident, v T) error { evict streams.ErrEvicted ) - err := m.Map.Store(id, v) + err := f.Map.Store(id, v) switch { - case err == nil: - // all good + default: + return err case errors.As(err, &olderStart): - // non fatal. record but ignore - inc(m.dps.dropped, reason("older-start")) - err = nil + inc(f.dps.dropped, reason("older-start")) case errors.As(err, &outOfOrder): - // non fatal. record but ignore - inc(m.dps.dropped, reason("out-of-order")) - err = nil - case errors.As(err, &evict): - inc(m.streams.evicted) - err = nil + inc(f.dps.dropped, reason("out-of-order")) case errors.As(err, &limit): - inc(m.dps.dropped, reason("stream-limit")) - err = nil + inc(f.dps.dropped, reason("stream-limit")) + case errors.As(err, &evict): + inc(f.streams.evicted) case errors.As(err, &gap): - // a gap occurred. record its length, but ignore from := gap.From.AsTime() to := gap.To.AsTime() lost := to.Sub(from).Seconds() - m.gaps.Add(context.TODO(), int64(lost)) - err = nil + f.gaps.Add(context.TODO(), int64(lost)) } - // not dropped and not seen before => new stream - if err == nil && !old { - inc(m.streams.tracked) - } - return err + return nil } -func (m *Map[T]) Delete(id streams.Ident) { - dec(m.streams.tracked) - m.Map.Delete(id) -} +var ( + _ streams.Map[any] = (*Items[any])(nil) + _ streams.Map[any] = (*Faults[any])(nil) +) type addable[Opts any] interface { Add(context.Context, int64, ...Opts) diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 6c7d1f69ea77f..89a7185031ecb 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -50,8 +50,11 @@ func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consume next: next, } + tel := telemetry.New(meter) + var dps streams.Map[data.Number] dps = delta.New[data.Number]() + dps = telemetry.ObserveItems(dps, &tel.Metrics) if cfg.MaxStale > 0 { stale := maybe.Some(staleness.NewStaleness(cfg.MaxStale, dps)) @@ -59,6 +62,7 @@ func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consume dps, _ = stale.Try() } if cfg.MaxStreams > 0 { + tel.WithLimit(meter, int64(cfg.MaxStreams)) lim := streams.Limit(dps, cfg.MaxStreams) if stale, ok := proc.stale.Try(); ok { lim.Evictor = stale @@ -66,9 +70,7 @@ func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consume dps = lim } - tel := telemetry.Observe(dps, meter) - tel.WithLimit(meter, int64(cfg.MaxStreams)) - dps = tel + dps = telemetry.ObserveNonFatal(dps, &tel.Metrics) proc.aggr = streams.IntoAggregator(dps) return &proc From 373b9a0386cedaa0a093a6844462a4fc2495bf52 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Mon, 18 Mar 2024 11:19:54 +0100 Subject: [PATCH 11/18] *: addlicense --- processor/deltatocumulativeprocessor/internal/maybe/option.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/processor/deltatocumulativeprocessor/internal/maybe/option.go b/processor/deltatocumulativeprocessor/internal/maybe/option.go index f2b25ab660ea8..af187b238594b 100644 --- a/processor/deltatocumulativeprocessor/internal/maybe/option.go +++ b/processor/deltatocumulativeprocessor/internal/maybe/option.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package maybe // Ptr is a pointer that points to something that is not guaranteed to exist. From 21699d8503fe38f1a1da17d6ebbcfab4f8930854 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Mon, 25 Mar 2024 11:51:01 +0100 Subject: [PATCH 12/18] streams: permit write to existing at limit --- .../deltatocumulativeprocessor/internal/streams/limit.go | 4 +++- .../internal/streams/limit_test.go | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go index 406b8f41291bb..23dd137cafa14 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit.go @@ -23,7 +23,9 @@ type LimitMap[T any] struct { } func (m LimitMap[T]) Store(id identity.Stream, v T) error { - if m.Map.Len() <= m.Max { + _, ok := m.Map.Load(id) + avail := m.Map.Len() < m.Max + if ok || avail { return m.Map.Store(id, v) } diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go index d0c5af6e56662..04ffffbde5f57 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go @@ -22,11 +22,13 @@ func TestLimit(t *testing.T) { lim := streams.Limit(items, 10) ids := make([]identity.Stream, 10) + dps := make([]data.Number, 10) // write until limit must work for i := 0; i < 10; i++ { id, dp := sum.Stream() ids[i] = id + dps[i] = dp err := lim.Store(id, dp) require.NoError(t, err) } @@ -40,6 +42,12 @@ func TestLimit(t *testing.T) { require.True(t, streams.AtLimit(err)) } + // write to existing must work + { + err := lim.Store(ids[3], dps[3]) + require.NoError(t, err) + } + // after removing one, must be accepted again { lim.Delete(ids[0]) From e27c8f788753c6a2e8f967c83bc596674d7df6a2 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Mon, 25 Mar 2024 12:33:31 +0100 Subject: [PATCH 13/18] *: goporto --- processor/deltatocumulativeprocessor/internal/maybe/option.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/deltatocumulativeprocessor/internal/maybe/option.go b/processor/deltatocumulativeprocessor/internal/maybe/option.go index af187b238594b..0cd039ccdd85f 100644 --- a/processor/deltatocumulativeprocessor/internal/maybe/option.go +++ b/processor/deltatocumulativeprocessor/internal/maybe/option.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package maybe +package maybe // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" // Ptr is a pointer that points to something that is not guaranteed to exist. // Any use must "Try()" accessing the underlying value, enforcing checking the From ecc88a862b39634225a8f6da360bb01bb8ecdcb2 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Wed, 3 Apr 2024 18:01:13 +0200 Subject: [PATCH 14/18] maybe: godoc, tests, example --- .../internal/maybe/option.go | 25 -------- .../internal/maybe/ptr.go | 52 ++++++++++++++++ .../internal/maybe/ptr_test.go | 60 +++++++++++++++++++ 3 files changed, 112 insertions(+), 25 deletions(-) delete mode 100644 processor/deltatocumulativeprocessor/internal/maybe/option.go create mode 100644 processor/deltatocumulativeprocessor/internal/maybe/ptr.go create mode 100644 processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go diff --git a/processor/deltatocumulativeprocessor/internal/maybe/option.go b/processor/deltatocumulativeprocessor/internal/maybe/option.go deleted file mode 100644 index 0cd039ccdd85f..0000000000000 --- a/processor/deltatocumulativeprocessor/internal/maybe/option.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package maybe // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" - -// Ptr is a pointer that points to something that is not guaranteed to exist. -// Any use must "Try()" accessing the underlying value, enforcing checking the -// ok return value too. -// This provides a clear distinction between "not set" and "set to nil" -type Ptr[T any] struct { - to *T - ok bool -} - -func None[T any]() Ptr[T] { - return Ptr[T]{to: nil, ok: false} -} - -func Some[T any](ptr *T) Ptr[T] { - return Ptr[T]{to: ptr, ok: true} -} - -func (ptr Ptr[T]) Try() (_ *T, ok bool) { - return ptr.to, ptr.ok -} diff --git a/processor/deltatocumulativeprocessor/internal/maybe/ptr.go b/processor/deltatocumulativeprocessor/internal/maybe/ptr.go new file mode 100644 index 0000000000000..ee7da576f4f49 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/maybe/ptr.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// maybe provides utilities for representing data may or may not exist at +// runtime in a safe way. +// +// A typical approach to this are pointers, but they suffer from two issues: +// - Unsafety: permitting nil pointers must require careful checking on each use, +// which is easily forgotten +// - Blindness: nil itself does cannot differentiate between "set to nil" and +// "not set all", leading to unexepcted edge cases +// +// The [Ptr] type of this package provides a safe alternative with a clear +// distinction between "not set" and "set to nil". +package maybe // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" + +// Ptr references some value of type T that is not guaranteed to exist. +// Callers must use [Ptr.Try] to access the underlying value, checking the +// ok return value too. +// This provides a clear distinction between "not set" and "set to nil". +// +// Use [Some] and [None] to create Ptrs. +type Ptr[T any] struct { + to *T + ok bool +} + +// None returns a Ptr that represents "not-set". +// This is equal to a zero-value Ptr. +func None[T any]() Ptr[T] { + return Ptr[T]{to: nil, ok: false} +} + +// Some returns a pointer to the passed T. +// +// The ptr argument may be nil, in which case this represents "explictely set to +// nil". +func Some[T any](ptr *T) Ptr[T] { + return Ptr[T]{to: ptr, ok: true} +} + +// Try attempts to de-reference the Ptr, giving one of three results: +// +// - nil, false: not-set +// - nil, true: explicitely set to nil +// - non-nil, true: set to some value +// +// This provides extra safety over bare pointers, because callers are forced by +// the compiler to either check or explicitely ignore the ok value. +func (ptr Ptr[T]) Try() (_ *T, ok bool) { + return ptr.to, ptr.ok +} diff --git a/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go b/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go new file mode 100644 index 0000000000000..2700db93b8287 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go @@ -0,0 +1,60 @@ +package maybe_test + +import ( + "fmt" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" + "github.com/stretchr/testify/require" +) + +func TestMaybe(t *testing.T) { + t.Run("zero-not-ok", func(t *testing.T) { + var ptr maybe.Ptr[int] + _, ok := ptr.Try() + require.False(t, ok) + }) + t.Run("none-not-ok", func(t *testing.T) { + ptr := maybe.None[int]() + _, ok := ptr.Try() + require.False(t, ok) + }) + t.Run("explicit-nil", func(t *testing.T) { + ptr := maybe.Some[int](nil) + v, ok := ptr.Try() + require.Nil(t, v) + require.True(t, ok) + }) + t.Run("value", func(t *testing.T) { + num := 42 + ptr := maybe.Some(&num) + v, ok := ptr.Try() + require.True(t, ok) + require.Equal(t, num, *v) + }) +} + +func ExamplePtr() { + var unset maybe.Ptr[int] // = maybe.None() + if v, ok := unset.Try(); ok { + fmt.Println("unset:", v) + } else { + fmt.Println("unset: !ok") + } + + var xnil maybe.Ptr[int] = maybe.Some[int](nil) + if v, ok := xnil.Try(); ok { + fmt.Println("explicit nil:", v) + } + + num := 42 + var set maybe.Ptr[int] = maybe.Some(&num) + if v, ok := set.Try(); ok { + fmt.Println("set:", *v) + } + + // Output: + // unset: !ok + // explicit nil: + // set: 42 +} From fb77807936370375a6a6c693c6db04134a9a5dfc Mon Sep 17 00:00:00 2001 From: sh0rez Date: Mon, 8 Apr 2024 14:47:39 +0200 Subject: [PATCH 15/18] telemetry: TestFaults --- .../internal/maybe/ptr.go | 6 +- .../internal/maybe/ptr_test.go | 6 +- .../internal/streams/limit.go | 6 +- .../internal/telemetry/faults_test.go | 150 ++++++++++++++++++ 4 files changed, 161 insertions(+), 7 deletions(-) create mode 100644 processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go diff --git a/processor/deltatocumulativeprocessor/internal/maybe/ptr.go b/processor/deltatocumulativeprocessor/internal/maybe/ptr.go index ee7da576f4f49..8f40b8d277b31 100644 --- a/processor/deltatocumulativeprocessor/internal/maybe/ptr.go +++ b/processor/deltatocumulativeprocessor/internal/maybe/ptr.go @@ -33,7 +33,7 @@ func None[T any]() Ptr[T] { // Some returns a pointer to the passed T. // -// The ptr argument may be nil, in which case this represents "explictely set to +// The ptr argument may be nil, in which case this represents "explicitly set to // nil". func Some[T any](ptr *T) Ptr[T] { return Ptr[T]{to: ptr, ok: true} @@ -42,11 +42,11 @@ func Some[T any](ptr *T) Ptr[T] { // Try attempts to de-reference the Ptr, giving one of three results: // // - nil, false: not-set -// - nil, true: explicitely set to nil +// - nil, true: explicitly set to nil // - non-nil, true: set to some value // // This provides extra safety over bare pointers, because callers are forced by -// the compiler to either check or explicitely ignore the ok value. +// the compiler to either check or explicitly ignore the ok value. func (ptr Ptr[T]) Try() (_ *T, ok bool) { return ptr.to, ptr.ok } diff --git a/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go b/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go index 2700db93b8287..c32c34e7e5057 100644 --- a/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go +++ b/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go @@ -1,11 +1,15 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package maybe_test import ( "fmt" "testing" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" ) func TestMaybe(t *testing.T) { diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go index 23dd137cafa14..3e021b6d5d74a 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit.go @@ -35,7 +35,7 @@ func (m LimitMap[T]) Store(id identity.Stream, v T) error { if err := m.Map.Store(id, v); err != nil { return err } - return ErrEvicted{ErrLimit: errl, id: gone} + return ErrEvicted{ErrLimit: errl, Ident: gone} } return errl } @@ -53,9 +53,9 @@ func AtLimit(err error) bool { type ErrEvicted struct { ErrLimit - id Ident + Ident Ident } func (e ErrEvicted) Error() string { - return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.id) + return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.Ident) } diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go new file mode 100644 index 0000000000000..4809d74fda042 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go @@ -0,0 +1,150 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/otel/metric/noop" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" +) + +// TestFaults verifies certain non-fatal errors are actually caused and +// subsequently dropped. It does so by writing bad samples to the actual +// implementation instead of fabricating errors manually. +func TestFaults(t *testing.T) { + type Map = streams.Map[data.Number] + type Case struct { + Name string + Map Map + Pre func(Map, identity.Stream, data.Number) error + Bad func(Map, identity.Stream, data.Number) error + Err error + } + + sum := random.Sum() + evid, evdp := sum.Stream() + + cases := []Case{ + { + Name: "older-start", + Pre: func(dps Map, id identity.Stream, dp data.Number) error { + dp.SetStartTimestamp(ts(20)) + dp.SetTimestamp(ts(30)) + return dps.Store(id, dp) + }, + Bad: func(dps Map, id identity.Stream, dp data.Number) error { + dp.SetStartTimestamp(ts(10)) + dp.SetTimestamp(ts(40)) + return dps.Store(id, dp) + }, + Err: delta.ErrOlderStart{Start: ts(20), Sample: ts(10)}, + }, + { + Name: "out-of-order", + Pre: func(dps Map, id identity.Stream, dp data.Number) error { + dp.SetTimestamp(ts(20)) + return dps.Store(id, dp) + }, + Bad: func(dps Map, id identity.Stream, dp data.Number) error { + dp.SetTimestamp(ts(10)) + return dps.Store(id, dp) + }, + Err: delta.ErrOutOfOrder{Last: ts(20), Sample: ts(10)}, + }, + { + Name: "gap", + Pre: func(dps Map, id identity.Stream, dp data.Number) error { + dp.SetStartTimestamp(ts(10)) + dp.SetTimestamp(ts(20)) + return dps.Store(id, dp) + }, + Bad: func(dps Map, id identity.Stream, dp data.Number) error { + dp.SetStartTimestamp(ts(30)) + dp.SetTimestamp(ts(40)) + return dps.Store(id, dp) + }, + Err: delta.ErrGap{From: ts(20), To: ts(30)}, + }, + { + Name: "limit", + Map: streams.Limit(delta.New[data.Number](), 1), + Pre: func(dps Map, id identity.Stream, dp data.Number) error { + dp.SetTimestamp(ts(10)) + return dps.Store(id, dp) + }, + Bad: func(dps Map, _ identity.Stream, _ data.Number) error { + id, dp := sum.Stream() + dp.SetTimestamp(ts(20)) + return dps.Store(id, dp) + }, + Err: streams.ErrLimit(1), + }, + { + Name: "evict", + Map: func() Map { + ev := HeadEvictor[data.Number]{Map: delta.New[data.Number]()} + lim := streams.Limit(ev, 1) + lim.Evictor = ev + return lim + }(), + Pre: func(dps Map, _ identity.Stream, _ data.Number) error { + evdp.SetTimestamp(ts(10)) + return dps.Store(evid, evdp) + }, + Bad: func(dps Map, _ identity.Stream, _ data.Number) error { + id, dp := sum.Stream() + dp.SetTimestamp(ts(20)) + return dps.Store(id, dp) + }, + Err: streams.ErrEvicted{Ident: evid, ErrLimit: streams.ErrLimit(1)}, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + id, dp := sum.Stream() + tel := telemetry.New(noop.Meter{}) + + dps := c.Map + if dps == nil { + dps = delta.New[data.Number]() + } + onf := telemetry.ObserveNonFatal(dps, &tel.Metrics) + + if c.Pre != nil { + err := c.Pre(onf, id, dp.Clone()) + require.NoError(t, err) + } + + err := c.Bad(dps, id, dp.Clone()) + require.Equal(t, c.Err, err) + + err = c.Bad(onf, id, dp.Clone()) + require.NoError(t, err) + }) + } +} + +type ts = pcommon.Timestamp + +// HeadEvictor drops the first stream on Evict() +type HeadEvictor[T any] struct{ streams.Map[T] } + +func (e HeadEvictor[T]) Evict() (evicted identity.Stream) { + e.Items()(func(id identity.Stream, _ T) bool { + e.Delete(id) + evicted = id + return false + }) + return evicted +} From f9b8fa1a0b6c3f11c3766ec67e1879c4484bf3ed Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 9 Apr 2024 15:55:46 +0200 Subject: [PATCH 16/18] deltatocumulative: better explain ErrGap --- processor/deltatocumulativeprocessor/internal/delta/delta.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index a747d61419a1c..5539eb8c8e499 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -80,5 +80,5 @@ type ErrGap struct { } func (e ErrGap) Error() string { - return fmt.Sprintf("gap in stream from %s to %s", e.From, e.To) + return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To) } From c5f34743d4fef791dae5d3839e3af1f56a1da06a Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 9 Apr 2024 23:44:22 +0200 Subject: [PATCH 17/18] *: bump collector mod --- processor/deltatocumulativeprocessor/go.mod | 2 +- processor/deltatocumulativeprocessor/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index 14ee604a3c59d..e41c158281333 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -40,7 +40,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect - go.opentelemetry.io/collector v0.97.1-0.20240404121116-4f1a8936d26b // indirect + go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240409140257-792fac1b62d4 // indirect go.opentelemetry.io/collector/pdata/testdata v0.0.0-20240408153657-fc289290613a // indirect go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect diff --git a/processor/deltatocumulativeprocessor/go.sum b/processor/deltatocumulativeprocessor/go.sum index bbd7b3b52a45a..02b12ab515393 100644 --- a/processor/deltatocumulativeprocessor/go.sum +++ b/processor/deltatocumulativeprocessor/go.sum @@ -64,8 +64,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/collector v0.97.1-0.20240404121116-4f1a8936d26b h1:AtFiwRf8juWFUzVnFUMpDMKJ4YkW6wcVxixHmHUiX4I= -go.opentelemetry.io/collector v0.97.1-0.20240404121116-4f1a8936d26b/go.mod h1:w3KXCBoPBDBLr3Tm2w8OwHRSekhV7b21v2c7IFdvtME= +go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4 h1:2t5axZqKOQWX4n4tHzVOsFvv6JddzYcYPtnpm90yQ88= +go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:tqJMDpvR9AgWwvfUiQnvYBfOc2jCQdmA700G92iU1to= go.opentelemetry.io/collector/component v0.97.1-0.20240409140257-792fac1b62d4 h1:47QO6HD8Ts3w9fAV0Qf3lIH8RFP1vXK1xEPAcL7FYkE= go.opentelemetry.io/collector/component v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:um+Bn0rshAr3diL1p+GCpw3cRxENFYMNIPBL6Hl8Pok= go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240409140257-792fac1b62d4 h1:pgNIGcQNf2rAI7qtUV7UGk+4D8RD/m8CeoU/Sv5qvIM= From 467696fe09990239a6ace5ead0b47f63f4cc112e Mon Sep 17 00:00:00 2001 From: sh0rez Date: Wed, 10 Apr 2024 13:43:34 +0200 Subject: [PATCH 18/18] *: empty to trigger ci