Skip to content

Commit 0363169

Browse files
committed
Clarify naming and documentation
1 parent f295494 commit 0363169

File tree

4 files changed

+121
-49
lines changed

4 files changed

+121
-49
lines changed

sdk/metric/internal/aggregator.go

+21-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ var now = time.Now
2727

2828
// Aggregator forms an aggregation from a collection of recorded measurements.
2929
//
30-
// Aggregators need to be comparable so they can be de-duplicated by the SDK when
31-
// it creates them for multiple views.
30+
// Aggregators need to be comparable so they can be de-duplicated by the SDK
31+
// when it creates them for multiple views.
3232
type Aggregator[N int64 | float64] interface {
3333
// Aggregate records the measurement, scoped by attr, and aggregates it
3434
// into an aggregation.
@@ -38,3 +38,22 @@ type Aggregator[N int64 | float64] interface {
3838
// measurements made and ends an aggregation cycle.
3939
Aggregation() metricdata.Aggregation
4040
}
41+
42+
// precomputeAggregator is an Aggregator that recieves values to aggregate that
43+
// have been pre-computed by the caller.
44+
type precomputeAggregator[N int64 | float64] interface {
45+
// The Aggregate method of the embedded Aggregator is used to record
46+
// pre-computed measurements, scoped by attributes that have not been
47+
// filtered by an attribute filter.
48+
Aggregator[N]
49+
50+
// aggregateFiltered records measurements scoped by attributes that have
51+
// been filtered by an attribute filter.
52+
//
53+
// Pre-computed measurements of filtered attributes need to be recorded
54+
// separate from those that haven't been filtered so they can be added to
55+
// the non-filtered pre-computed measurements in a collection cycle and
56+
// then resest after the cycle (the non-filtered pre-computed measurements
57+
// are not reset).
58+
aggregateFiltered(N, attribute.Set)
59+
}

sdk/metric/internal/filter.go

+33-18
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,26 @@ import (
2121
"go.opentelemetry.io/otel/sdk/metric/metricdata"
2222
)
2323

24-
type filterAgg[N int64 | float64] interface {
25-
Aggregator[N]
26-
27-
// filtered records values for attributes that have been filtered.
28-
filtered(N, attribute.Set)
29-
}
30-
31-
// NewFilter wraps an Aggregator with an attribute filtering function.
24+
// NewFilter returns an Aggregator that wraps an agg with an attribute
25+
// filtering function. Both pre-computed non-pre-computed Aggregators can be
26+
// passed for agg. An appropriate Aggregator will be returned for the detected
27+
// type.
3228
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
3329
if fn == nil {
3430
return agg
3531
}
36-
if fa, ok := agg.(filterAgg[N]); ok {
32+
if fa, ok := agg.(precomputeAggregator[N]); ok {
3733
return newPrecomputedFilter(fa, fn)
3834
}
3935
return newFilter(agg, fn)
4036
}
4137

42-
// filter is an aggregator that applies attribute filter when Aggregating. filters
43-
// do not have any backing memory, and must be constructed with a backing Aggregator.
38+
// filter wraps an aggregator with an attribute filter. All recorded
39+
// measurements will have their attributes filtered before they are passed to
40+
// the underlying aggregator's Aggregate method.
41+
//
42+
// This should not be used to wrap a pre-computed Aggregator. Use a
43+
// precomputedFilter instead.
4444
type filter[N int64 | float64] struct {
4545
filter attribute.Filter
4646
aggregator Aggregator[N]
@@ -49,6 +49,11 @@ type filter[N int64 | float64] struct {
4949
seen map[attribute.Set]attribute.Set
5050
}
5151

52+
// newFilter returns an filter Aggregator that wraps agg with the attribute
53+
// filter fn.
54+
//
55+
// This should not be used to wrap a pre-computed Aggregator. Use a
56+
// precomputedFilter instead.
5257
func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] {
5358
return &filter[N]{
5459
filter: fn,
@@ -78,25 +83,33 @@ func (f *filter[N]) Aggregation() metricdata.Aggregation {
7883
}
7984

8085
// precomputedFilter is an aggregator that applies attribute filter when
81-
// Aggregating for precomputed Aggregations. The precomputed Aggregations need
82-
// to operate normally when no attribute filtering is done (for sums this means
83-
// setting the value), but when attribute filtering is done it needs to be
84-
// added to any set value.
86+
// Aggregating for pre-computed Aggregations. The pre-computed Aggregations
87+
// need to operate normally when no attribute filtering is done (for sums this
88+
// means setting the value), but when attribute filtering is done it needs to
89+
// be added to any set value.
8590
type precomputedFilter[N int64 | float64] struct {
8691
filter attribute.Filter
87-
aggregator filterAgg[N]
92+
aggregator precomputeAggregator[N]
8893

8994
sync.Mutex
9095
seen map[attribute.Set]attribute.Set
9196
}
9297

93-
func newPrecomputedFilter[N int64 | float64](agg filterAgg[N], fn attribute.Filter) *precomputedFilter[N] {
98+
// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
99+
// with the attribute filter fn.
100+
//
101+
// This should not be used to wrap a non-pre-computed Aggregator. Use a
102+
// precomputedFilter instead.
103+
func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] {
94104
return &precomputedFilter[N]{
95105
filter: fn,
96106
aggregator: agg,
97107
seen: make(map[attribute.Set]attribute.Set),
98108
}
99109
}
110+
111+
// Aggregate records the measurement, scoped by attr, and aggregates it
112+
// into an aggregation.
100113
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
101114
// TODO (#3006): drop stale attributes from seen.
102115
f.Lock()
@@ -110,10 +123,12 @@ func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
110123
// No filtering done.
111124
f.aggregator.Aggregate(measurement, fAttr)
112125
} else {
113-
f.aggregator.filtered(measurement, fAttr)
126+
f.aggregator.aggregateFiltered(measurement, fAttr)
114127
}
115128
}
116129

130+
// Aggregation returns an Aggregation, for all the aggregated
131+
// measurements made and ends an aggregation cycle.
117132
func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation {
118133
return f.aggregator.Aggregation()
119134
}

sdk/metric/internal/sum.go

+55-17
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,17 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
158158
return out
159159
}
160160

161+
// precomputedValue is the recorded measurement value for a set of attributes.
161162
type precomputedValue[N int64 | float64] struct {
162-
// measured is the value directly measured.
163+
// measured is the last value measured for a set of attributes that were
164+
// not filtered.
163165
measured N
164-
// filtered is the sum of values from spatially aggregations.
166+
// filtered is the sum of values from measurements that had their
167+
// attributes filtered.
165168
filtered N
166169
}
167170

168-
// valueMap is the storage for precomputed sums.
171+
// precomputedMap is the storage for precomputed sums.
169172
type precomputedMap[N int64 | float64] struct {
170173
sync.Mutex
171174
values map[attribute.Set]precomputedValue[N]
@@ -177,7 +180,14 @@ func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] {
177180
}
178181
}
179182

180-
// Aggregate records value as a cumulative sum for attr.
183+
// Aggregate records value with the unfiltered attributes attr.
184+
//
185+
// If a previous measurement was made for the same attribute set:
186+
//
187+
// - If that measurement's attributes were not filtered, this value overwrite
188+
// that value.
189+
// - If that measurement's attributes were filtered, this value will be
190+
// recorded along side that value.
181191
func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) {
182192
s.Lock()
183193
v := s.values[attr]
@@ -186,8 +196,18 @@ func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) {
186196
s.Unlock()
187197
}
188198

189-
// filtered records value with spatially re-aggregated attrs.
190-
func (s *precomputedMap[N]) filtered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered.
199+
// aggregateFiltered records value with the filtered attributes attr.
200+
//
201+
// If a previous measurement was made for the same attribute set:
202+
//
203+
// - If that measurement's attributes were not filtered, this value will be
204+
// recorded along side that value.
205+
// - If that measurement's attributes were filtered, this value will be
206+
// added to it.
207+
//
208+
// This method should not be used if attr have not been reduced by an attribute
209+
// filter.
210+
func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered.
191211
s.Lock()
192212
v := s.values[attr]
193213
v.filtered += value
@@ -196,15 +216,14 @@ func (s *precomputedMap[N]) filtered(value N, attr attribute.Set) { // nolint: u
196216
}
197217

198218
// NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of
199-
// measurements as their pre-computed arithmetic sum. Each sum is scoped by
200-
// attributes and the aggregation cycle the measurements were made in.
219+
// pre-computed sums. Each sum is scoped by attributes and the aggregation
220+
// cycle the measurements were made in.
201221
//
202222
// The monotonic value is used to communicate the produced Aggregation is
203223
// monotonic or not. The returned Aggregator does not make any guarantees this
204224
// value is accurate. It is up to the caller to ensure it.
205225
//
206-
// The output Aggregation will report recorded values as delta temporality. It
207-
// is up to the caller to ensure this is accurate.
226+
// The output Aggregation will report recorded values as delta temporality.
208227
func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
209228
return &precomputedDeltaSum[N]{
210229
precomputedMap: newPrecomputedMap[N](),
@@ -214,8 +233,8 @@ func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
214233
}
215234
}
216235

217-
// precomputedDeltaSum summarizes a set of measurements recorded over all
218-
// aggregation cycles as the delta arithmetic sum.
236+
// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all
237+
// aggregation cycles as the delta of these sums.
219238
type precomputedDeltaSum[N int64 | float64] struct {
220239
*precomputedMap[N]
221240

@@ -225,6 +244,16 @@ type precomputedDeltaSum[N int64 | float64] struct {
225244
start time.Time
226245
}
227246

247+
// Aggregation returns the recorded pre-computed sums as an Aggregation. The
248+
// sum values are expressed as the delta between what was measured this
249+
// collection cycle and the previous.
250+
//
251+
// All pre-computed sums that were recorded for attributes sets reduced by an
252+
// attribute filter (filtered-sums) are summed together and added to any
253+
// pre-computed sum value recorded directly for the resulting attribute set
254+
// (unfiltered-sum). The filtered-sums are reset to zero for the next
255+
// collection cycle, and the unfiltered-sum is kept for the next collection
256+
// cycle.
228257
func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
229258
s.Lock()
230259
defer s.Unlock()
@@ -264,15 +293,15 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
264293
}
265294

266295
// NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of
267-
// measurements as their pre-computed arithmetic sum. Each sum is scoped by
268-
// attributes and the aggregation cycle the measurements were made in.
296+
// pre-computed sums. Each sum is scoped by attributes and the aggregation
297+
// cycle the measurements were made in.
269298
//
270299
// The monotonic value is used to communicate the produced Aggregation is
271300
// monotonic or not. The returned Aggregator does not make any guarantees this
272301
// value is accurate. It is up to the caller to ensure it.
273302
//
274303
// The output Aggregation will report recorded values as cumulative
275-
// temporality. It is up to the caller to ensure this is accurate.
304+
// temporality.
276305
func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
277306
return &precomputedCumulativeSum[N]{
278307
precomputedMap: newPrecomputedMap[N](),
@@ -281,15 +310,24 @@ func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N
281310
}
282311
}
283312

284-
// precomputedCumulativeSum summarizes a set of measurements recorded over all
285-
// aggregation cycles directly as the cumulative arithmetic sum.
313+
// precomputedCumulativeSum directly records and reports a set of pre-computed sums.
286314
type precomputedCumulativeSum[N int64 | float64] struct {
287315
*precomputedMap[N]
288316

289317
monotonic bool
290318
start time.Time
291319
}
292320

321+
// Aggregation returns the recorded pre-computed sums as an Aggregation. The
322+
// sum values are expressed directly as they are assumed to be recorded as the
323+
// cumulative sum of a some measured phenomena.
324+
//
325+
// All pre-computed sums that were recorded for attributes sets reduced by an
326+
// attribute filter (filtered-sums) are summed together and added to any
327+
// pre-computed sum value recorded directly for the resulting attribute set
328+
// (unfiltered-sum). The filtered-sums are reset to zero for the next
329+
// collection cycle, and the unfiltered-sum is kept for the next collection
330+
// cycle.
293331
func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
294332
s.Lock()
295333
defer s.Unlock()

sdk/metric/internal/sum_test.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func TestDeltaSumReset(t *testing.T) {
167167
func TestPreComputedDeltaSum(t *testing.T) {
168168
var mono bool
169169
agg := NewPrecomputedDeltaSum[int64](mono)
170-
require.Implements(t, (*filterAgg[int64])(nil), agg)
170+
require.Implements(t, (*precomputeAggregator[int64])(nil), agg)
171171

172172
attrs := attribute.NewSet(attribute.String("key", "val"))
173173
agg.Aggregate(1, attrs)
@@ -185,7 +185,7 @@ func TestPreComputedDeltaSum(t *testing.T) {
185185
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
186186
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
187187

188-
agg.(filterAgg[int64]).filtered(1, attrs)
188+
agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
189189
got = agg.Aggregation()
190190
// measured(+): 1, previous(-): 1, filtered(+): 1
191191
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
@@ -205,8 +205,8 @@ func TestPreComputedDeltaSum(t *testing.T) {
205205
agg.Aggregate(2, attrs)
206206
agg.Aggregate(5, attrs)
207207
// Filtered should add.
208-
agg.(filterAgg[int64]).filtered(3, attrs)
209-
agg.(filterAgg[int64]).filtered(10, attrs)
208+
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
209+
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
210210
got = agg.Aggregation()
211211
// measured(+): 5, previous(-): 1, filtered(+): 13
212212
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)}
@@ -221,9 +221,9 @@ func TestPreComputedDeltaSum(t *testing.T) {
221221

222222
// Order should not affect measure.
223223
// Filtered should add.
224-
agg.(filterAgg[int64]).filtered(3, attrs)
224+
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
225225
agg.Aggregate(7, attrs)
226-
agg.(filterAgg[int64]).filtered(10, attrs)
226+
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
227227
got = agg.Aggregation()
228228
// measured(+): 7, previous(-): 5, filtered(+): 13
229229
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 15)}
@@ -238,7 +238,7 @@ func TestPreComputedDeltaSum(t *testing.T) {
238238
func TestPreComputedCumulativeSum(t *testing.T) {
239239
var mono bool
240240
agg := NewPrecomputedCumulativeSum[int64](mono)
241-
require.Implements(t, (*filterAgg[int64])(nil), agg)
241+
require.Implements(t, (*precomputeAggregator[int64])(nil), agg)
242242

243243
attrs := attribute.NewSet(attribute.String("key", "val"))
244244
agg.Aggregate(1, attrs)
@@ -255,7 +255,7 @@ func TestPreComputedCumulativeSum(t *testing.T) {
255255
got = agg.Aggregation()
256256
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
257257

258-
agg.(filterAgg[int64]).filtered(1, attrs)
258+
agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
259259
got = agg.Aggregation()
260260
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)}
261261
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
@@ -268,8 +268,8 @@ func TestPreComputedCumulativeSum(t *testing.T) {
268268
// Override set value.
269269
agg.Aggregate(5, attrs)
270270
// Filtered should add.
271-
agg.(filterAgg[int64]).filtered(3, attrs)
272-
agg.(filterAgg[int64]).filtered(10, attrs)
271+
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
272+
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
273273
got = agg.Aggregation()
274274
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)}
275275
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
@@ -281,9 +281,9 @@ func TestPreComputedCumulativeSum(t *testing.T) {
281281

282282
// Order should not affect measure.
283283
// Filtered should add.
284-
agg.(filterAgg[int64]).filtered(3, attrs)
284+
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
285285
agg.Aggregate(7, attrs)
286-
agg.(filterAgg[int64]).filtered(10, attrs)
286+
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
287287
got = agg.Aggregation()
288288
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)}
289289
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

0 commit comments

Comments
 (0)