Skip to content

Commit 9a2484c

Browse files
evantorrierghetia
authored andcommitted
Implement support for NonAbsolute Measurement MaxSumCount (#335)
* Add tests for nonabsolute and varying sign values * Implement support for NonAbsolute Measurement MaxSumCount Previously, the MaxSumCount aggregator failed to work correctly with negative numbers (e.g. MeasureKind Alternate()==true). * Pass NumberKind to MaxSumCount New() function Allows it to set the initial state (current.max) to the correct value based on the NumberKind. * Revert extraneous local change * Pass full descriptor to msc New() This is analagous to the DDSketch New() constructor * Remember to run make precommit first * Add tests for empty checkpoint of MaxSumCount aggregator An empty checkpoint should have Sum() == 0, Count() == 0 and Max() still equal to the numberKind.Minimum() * Return ErrEmptyDataSet if no value set by the aggregator Remove TODO from stdout exporter to ensure that if a maxsumcount or ddsketch aggregator returns ErrEmptyDataSet from Max(), then the entire record will be skipped by the exporter. Added tests to ensure the exporter doesn't send any updates for EmptyDataSet checkpoints - for both ddsketch and maxsumcount. * Relayout Aggreggator struct to ensure int64s are 8-byte aligned On 32-bit architectures, Go only guarantees that primitive values are aligned to a 4 byte boundary. Atomic operations on 32-bit machines require 8-byte alignment. See golang/go#599 * Addressing PR comments The use of Minimum() for the default uninitialized Maximum value means that in the unlikely condition that every recorded value for a measure is equal to the same NumberKind.Minimum(), then the aggregator's Max() will return ErrEmptyDataSet * Fix PR merge issue
1 parent 0f052af commit 9a2484c

File tree

8 files changed

+167
-56
lines changed

8 files changed

+167
-56
lines changed

api/core/number.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,21 @@ const (
3434
Uint64NumberKind
3535
)
3636

37+
// Minimum returns the minimum representable value
38+
// for a given NumberKind
39+
func (k NumberKind) Minimum() Number {
40+
switch k {
41+
case Int64NumberKind:
42+
return NewInt64Number(math.MinInt64)
43+
case Float64NumberKind:
44+
return NewFloat64Number(-1. * math.MaxFloat64)
45+
case Uint64NumberKind:
46+
return NewUint64Number(0)
47+
default:
48+
return Number(0)
49+
}
50+
}
51+
3752
// Number represents either an integral or a floating point value. It
3853
// needs to be accompanied with a source of NumberKind that describes
3954
// the actual type of the value stored within Number.

exporter/metric/stdout/stdout.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,13 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
130130
expose.Count = count
131131
}
132132

133-
// TODO: Should tolerate ErrEmptyDataSet here,
134-
// just like ErrNoLastValue below, since
135-
// there's a race condition between creating
136-
// the Aggregator and updating the first
137-
// value.
138-
139133
if max, err := msc.Max(); err != nil {
134+
if err == aggregator.ErrEmptyDataSet {
135+
// This is a special case, indicates an aggregator that
136+
// was checkpointed before its first value was set.
137+
return
138+
}
139+
140140
aggError = err
141141
expose.Max = "NaN"
142142
} else {

exporter/metric/stdout/stdout_test.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func TestStdoutMaxSumCount(t *testing.T) {
162162
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
163163

164164
desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false)
165-
magg := maxsumcount.New()
165+
magg := maxsumcount.New(desc)
166166
aggtest.CheckedUpdate(fix.t, magg, core.NewFloat64Number(123.456), desc)
167167
aggtest.CheckedUpdate(fix.t, magg, core.NewFloat64Number(876.543), desc)
168168
magg.Checkpoint(fix.ctx, desc)
@@ -220,23 +220,30 @@ func TestStdoutMeasureFormat(t *testing.T) {
220220
}`, fix.Output())
221221
}
222222

223-
func TestStdoutAggError(t *testing.T) {
224-
fix := newFixture(t, stdout.Options{})
223+
func TestStdoutEmptyDataSet(t *testing.T) {
224+
desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false)
225+
for name, tc := range map[string]export.Aggregator{
226+
"ddsketch": ddsketch.New(ddsketch.NewDefaultConfig(), desc),
227+
"maxsumcount": maxsumcount.New(desc),
228+
} {
229+
tc := tc
230+
t.Run(name, func(t *testing.T) {
231+
t.Parallel()
225232

226-
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
233+
fix := newFixture(t, stdout.Options{})
227234

228-
desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false)
229-
magg := ddsketch.New(ddsketch.NewDefaultConfig(), desc)
230-
magg.Checkpoint(fix.ctx, desc)
235+
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
231236

232-
checkpointSet.Add(desc, magg)
237+
magg := tc
238+
magg.Checkpoint(fix.ctx, desc)
233239

234-
err := fix.exporter.Export(fix.ctx, checkpointSet)
240+
checkpointSet.Add(desc, magg)
235241

236-
// An error is returned and NaN values are printed.
237-
require.Error(t, err)
238-
require.Equal(t, aggregator.ErrEmptyDataSet, err)
239-
require.Equal(t, `{"updates":[{"name":"test.name","max":"NaN","sum":0,"count":0,"quantiles":[{"q":0.5,"v":"NaN"},{"q":0.9,"v":"NaN"},{"q":0.99,"v":"NaN"}]}]}`, fix.Output())
242+
fix.Export(checkpointSet)
243+
244+
require.Equal(t, `{"updates":null}`, fix.Output())
245+
})
246+
}
240247
}
241248

242249
func TestStdoutGaugeNotSet(t *testing.T) {

sdk/metric/aggregator/ddsketch/ddsketch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (c *Aggregator) Max() (core.Number, error) {
7575
return c.Quantile(1)
7676
}
7777

78-
// Min returns the mininum value in the checkpoint.
78+
// Min returns the minimum value in the checkpoint.
7979
func (c *Aggregator) Min() (core.Number, error) {
8080
return c.Quantile(0)
8181
}

sdk/metric/aggregator/maxsumcount/msc.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type (
2828
Aggregator struct {
2929
current state
3030
checkpoint state
31+
kind core.NumberKind
3132
}
3233

3334
state struct {
@@ -50,8 +51,15 @@ var _ aggregator.MaxSumCount = &Aggregator{}
5051
// atomic operations, which introduces the possibility that
5152
// checkpoints are inconsistent. For greater consistency and lower
5253
// performance, consider using Array or DDSketch aggregators.
53-
func New() *Aggregator {
54-
return &Aggregator{}
54+
func New(desc *export.Descriptor) *Aggregator {
55+
return &Aggregator{
56+
kind: desc.NumberKind(),
57+
current: unsetMaxSumCount(desc.NumberKind()),
58+
}
59+
}
60+
61+
func unsetMaxSumCount(kind core.NumberKind) state {
62+
return state{max: kind.Minimum()}
5563
}
5664

5765
// Sum returns the sum of values in the checkpoint.
@@ -65,15 +73,24 @@ func (c *Aggregator) Count() (int64, error) {
6573
}
6674

6775
// Max returns the maximum value in the checkpoint.
76+
// The error value aggregator.ErrEmptyDataSet will be returned if
77+
// (due to a race condition) the checkpoint was set prior to the
78+
// current.max being computed in Update().
79+
//
80+
// Note: If a measure's recorded values for a given checkpoint are
81+
// all equal to NumberKind.Minimum(), Max() will return ErrEmptyDataSet
6882
func (c *Aggregator) Max() (core.Number, error) {
83+
if c.checkpoint.max == c.kind.Minimum() {
84+
return core.Number(0), aggregator.ErrEmptyDataSet
85+
}
6986
return c.checkpoint.max, nil
7087
}
7188

7289
// Checkpoint saves the current state and resets the current state to
7390
// the empty set. Since no locks are taken, there is a chance that
7491
// the independent Max, Sum, and Count are not consistent with each
7592
// other.
76-
func (c *Aggregator) Checkpoint(ctx context.Context, _ *export.Descriptor) {
93+
func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
7794
// N.B. There is no atomic operation that can update all three
7895
// values at once without a memory allocation.
7996
//
@@ -86,7 +103,7 @@ func (c *Aggregator) Checkpoint(ctx context.Context, _ *export.Descriptor) {
86103

87104
c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
88105
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))
89-
c.checkpoint.max = c.current.max.SwapNumberAtomic(core.Number(0))
106+
c.checkpoint.max = c.current.max.SwapNumberAtomic(c.kind.Minimum())
90107
}
91108

92109
// Update adds the recorded measurement to the current data set.

sdk/metric/aggregator/maxsumcount/msc_test.go

Lines changed: 102 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,55 +16,104 @@ package maxsumcount
1616

1717
import (
1818
"context"
19+
"math"
20+
"math/rand"
1921
"testing"
2022

2123
"github.com/stretchr/testify/require"
2224

25+
"go.opentelemetry.io/otel/api/core"
2326
export "go.opentelemetry.io/otel/sdk/export/metric"
27+
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
2428
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
2529
)
2630

2731
const count = 100
2832

33+
type policy struct {
34+
name string
35+
absolute bool
36+
sign func() int
37+
}
38+
39+
var (
40+
positiveOnly = policy{
41+
name: "absolute",
42+
absolute: true,
43+
sign: func() int { return +1 },
44+
}
45+
negativeOnly = policy{
46+
name: "negative",
47+
absolute: false,
48+
sign: func() int { return -1 },
49+
}
50+
positiveAndNegative = policy{
51+
name: "positiveAndNegative",
52+
absolute: false,
53+
sign: func() int {
54+
if rand.Uint32() > math.MaxUint32/2 {
55+
return -1
56+
}
57+
return 1
58+
},
59+
}
60+
)
61+
2962
func TestMaxSumCountAbsolute(t *testing.T) {
30-
ctx := context.Background()
63+
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
64+
maxSumCount(t, profile, positiveOnly)
65+
})
66+
}
3167

68+
func TestMaxSumCountNegativeOnly(t *testing.T) {
3269
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
33-
record := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false)
70+
maxSumCount(t, profile, negativeOnly)
71+
})
72+
}
3473

35-
agg := New()
74+
func TestMaxSumCountPositiveAndNegative(t *testing.T) {
75+
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
76+
maxSumCount(t, profile, positiveAndNegative)
77+
})
78+
}
3679

37-
all := test.NewNumbers(profile.NumberKind)
80+
// Validates max, sum and count for a given profile and policy
81+
func maxSumCount(t *testing.T, profile test.Profile, policy policy) {
82+
ctx := context.Background()
83+
descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, !policy.absolute)
3884

39-
for i := 0; i < count; i++ {
40-
x := profile.Random(+1)
41-
all.Append(x)
42-
test.CheckedUpdate(t, agg, x, record)
43-
}
85+
agg := New(descriptor)
4486

45-
agg.Checkpoint(ctx, record)
87+
all := test.NewNumbers(profile.NumberKind)
4688

47-
all.Sort()
89+
for i := 0; i < count; i++ {
90+
x := profile.Random(policy.sign())
91+
all.Append(x)
92+
test.CheckedUpdate(t, agg, x, descriptor)
93+
}
4894

49-
asum, err := agg.Sum()
50-
require.InEpsilon(t,
51-
all.Sum().CoerceToFloat64(profile.NumberKind),
52-
asum.CoerceToFloat64(profile.NumberKind),
53-
0.000000001,
54-
"Same sum - absolute")
55-
require.Nil(t, err)
95+
agg.Checkpoint(ctx, descriptor)
5696

57-
count, err := agg.Count()
58-
require.Equal(t, all.Count(), count, "Same count - absolute")
59-
require.Nil(t, err)
97+
all.Sort()
6098

61-
max, err := agg.Max()
62-
require.Nil(t, err)
63-
require.Equal(t,
64-
all.Max(),
65-
max,
66-
"Same max - absolute")
67-
})
99+
asum, err := agg.Sum()
100+
require.InEpsilon(t,
101+
all.Sum().CoerceToFloat64(profile.NumberKind),
102+
asum.CoerceToFloat64(profile.NumberKind),
103+
0.000000001,
104+
"Same sum - "+policy.name)
105+
require.Nil(t, err)
106+
107+
count, err := agg.Count()
108+
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
109+
require.Nil(t, err)
110+
111+
max, err := agg.Max()
112+
require.Nil(t, err)
113+
require.Equal(t,
114+
all.Max(),
115+
max,
116+
"Same max -"+policy.name)
68117
}
69118

70119
func TestMaxSumCountMerge(t *testing.T) {
@@ -73,8 +122,8 @@ func TestMaxSumCountMerge(t *testing.T) {
73122
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
74123
descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false)
75124

76-
agg1 := New()
77-
agg2 := New()
125+
agg1 := New(descriptor)
126+
agg2 := New(descriptor)
78127

79128
all := test.NewNumbers(profile.NumberKind)
80129

@@ -116,3 +165,26 @@ func TestMaxSumCountMerge(t *testing.T) {
116165
"Same max - absolute")
117166
})
118167
}
168+
169+
func TestMaxSumCountNotSet(t *testing.T) {
170+
ctx := context.Background()
171+
172+
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
173+
descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false)
174+
175+
agg := New(descriptor)
176+
agg.Checkpoint(ctx, descriptor)
177+
178+
asum, err := agg.Sum()
179+
require.Equal(t, core.Number(0), asum, "Empty checkpoint sum = 0")
180+
require.Nil(t, err)
181+
182+
count, err := agg.Count()
183+
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
184+
require.Nil(t, err)
185+
186+
max, err := agg.Max()
187+
require.Equal(t, aggregator.ErrEmptyDataSet, err)
188+
require.Equal(t, core.Number(0), max)
189+
})
190+
}

sdk/metric/benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (*benchFixture) AggregatorFor(descriptor *export.Descriptor) export.Aggrega
5454
return gauge.New()
5555
case export.MeasureKind:
5656
if strings.HasSuffix(descriptor.Name(), "maxsumcount") {
57-
return maxsumcount.New()
57+
return maxsumcount.New(descriptor)
5858
} else if strings.HasSuffix(descriptor.Name(), "ddsketch") {
5959
return ddsketch.New(ddsketch.NewDefaultConfig(), descriptor)
6060
} else if strings.HasSuffix(descriptor.Name(), "array") {

sdk/metric/selector/simple/simple.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (selectorInexpensive) AggregatorFor(descriptor *export.Descriptor) export.A
7171
case export.GaugeKind:
7272
return gauge.New()
7373
case export.MeasureKind:
74-
return maxsumcount.New()
74+
return maxsumcount.New(descriptor)
7575
default:
7676
return counter.New()
7777
}

0 commit comments

Comments
 (0)