Skip to content

Commit 8680783

Browse files
evantorrierghetia
andauthored
Use StateLocker in MinMaxSumCount (open-telemetry#546)
* Add MinMaxSumCount stress test * Reimplement MinMaxSumCount using StateLocker * Address PR comments * Round open-telemetry#2 of PR comments Co-authored-by: Rahul Patel <[email protected]>
1 parent 46ac030 commit 8680783

File tree

5 files changed

+186
-73
lines changed

5 files changed

+186
-73
lines changed

api/core/number.go

+14
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,20 @@ const (
3434
Uint64NumberKind
3535
)
3636

37+
// Zero returns a zero value for a given NumberKind
38+
func (k NumberKind) Zero() Number {
39+
switch k {
40+
case Int64NumberKind:
41+
return NewInt64Number(0)
42+
case Float64NumberKind:
43+
return NewFloat64Number(0.)
44+
case Uint64NumberKind:
45+
return NewUint64Number(0)
46+
default:
47+
return Number(0)
48+
}
49+
}
50+
3751
// Minimum returns the minimum representable value
3852
// for a given NumberKind
3953
func (k NumberKind) Minimum() Number {

sdk/metric/aggregator/minmaxsumcount/mmsc.go

+86-66
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@ import (
2020
"go.opentelemetry.io/otel/api/core"
2121
export "go.opentelemetry.io/otel/sdk/export/metric"
2222
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
23+
"go.opentelemetry.io/otel/sdk/internal"
2324
)
2425

2526
type (
2627
// Aggregator aggregates measure events, keeping only the max,
2728
// sum, and count.
2829
Aggregator struct {
29-
// current has to be aligned for 64-bit atomic operations.
30-
current state
31-
// checkpoint has to be aligned for 64-bit atomic operations.
32-
checkpoint state
33-
kind core.NumberKind
30+
// states has to be aligned for 64-bit atomic operations.
31+
states [2]state
32+
lock internal.StateLocker
33+
kind core.NumberKind
3434
}
3535

3636
state struct {
@@ -48,104 +48,116 @@ var _ aggregator.MinMaxSumCount = &Aggregator{}
4848
// New returns a new measure aggregator for computing min, max, sum, and
4949
// count. It does not compute quantile information other than Max.
5050
//
51-
// Note that this aggregator maintains each value using independent
52-
// atomic operations, which introduces the possibility that
53-
// checkpoints are inconsistent. For greater consistency and lower
54-
// performance, consider using Array or DDSketch aggregators.
51+
// This aggregator uses the StateLocker pattern to guarantee
52+
// the count, sum, min and max are consistent within a checkpoint
5553
func New(desc *export.Descriptor) *Aggregator {
54+
kind := desc.NumberKind()
5655
return &Aggregator{
57-
kind: desc.NumberKind(),
58-
current: unsetMinMaxSumCount(desc.NumberKind()),
56+
kind: kind,
57+
states: [2]state{
58+
{
59+
count: core.NewUint64Number(0),
60+
sum: kind.Zero(),
61+
min: kind.Maximum(),
62+
max: kind.Minimum(),
63+
},
64+
{
65+
count: core.NewUint64Number(0),
66+
sum: kind.Zero(),
67+
min: kind.Maximum(),
68+
max: kind.Minimum(),
69+
},
70+
},
5971
}
6072
}
6173

62-
func unsetMinMaxSumCount(kind core.NumberKind) state {
63-
return state{min: kind.Maximum(), max: kind.Minimum()}
64-
}
65-
6674
// Sum returns the sum of values in the checkpoint.
6775
func (c *Aggregator) Sum() (core.Number, error) {
68-
return c.checkpoint.sum, nil
76+
c.lock.Lock()
77+
defer c.lock.Unlock()
78+
return c.checkpoint().sum, nil
6979
}
7080

7181
// Count returns the number of values in the checkpoint.
7282
func (c *Aggregator) Count() (int64, error) {
73-
return int64(c.checkpoint.count.AsUint64()), nil
83+
c.lock.Lock()
84+
defer c.lock.Unlock()
85+
return c.checkpoint().count.CoerceToInt64(core.Uint64NumberKind), nil
7486
}
7587

7688
// Min returns the minimum value in the checkpoint.
77-
// The error value aggregator.ErrEmptyDataSet will be returned if
78-
// (due to a race condition) the checkpoint was set prior to
79-
// current.min being computed in Update().
80-
//
81-
// Note: If a measure's recorded values for a given checkpoint are
82-
// all equal to NumberKind.Maximum(), Min() will return ErrEmptyDataSet
89+
// The error value aggregator.ErrEmptyDataSet will be returned
90+
// if there were no measurements recorded during the checkpoint.
8391
func (c *Aggregator) Min() (core.Number, error) {
84-
if c.checkpoint.min == c.kind.Maximum() {
85-
return core.Number(0), aggregator.ErrEmptyDataSet
92+
c.lock.Lock()
93+
defer c.lock.Unlock()
94+
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
95+
return c.kind.Zero(), aggregator.ErrEmptyDataSet
8696
}
87-
return c.checkpoint.min, nil
97+
return c.checkpoint().min, nil
8898
}
8999

90100
// Max returns the maximum value in the checkpoint.
91-
// The error value aggregator.ErrEmptyDataSet will be returned if
92-
// (due to a race condition) the checkpoint was set prior to
93-
// current.max being computed in Update().
94-
//
95-
// Note: If a measure's recorded values for a given checkpoint are
96-
// all equal to NumberKind.Minimum(), Max() will return ErrEmptyDataSet
101+
// The error value aggregator.ErrEmptyDataSet will be returned
102+
// if there were no measurements recorded during the checkpoint.
97103
func (c *Aggregator) Max() (core.Number, error) {
98-
if c.checkpoint.max == c.kind.Minimum() {
99-
return core.Number(0), aggregator.ErrEmptyDataSet
104+
c.lock.Lock()
105+
defer c.lock.Unlock()
106+
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
107+
return c.kind.Zero(), aggregator.ErrEmptyDataSet
100108
}
101-
return c.checkpoint.max, nil
109+
return c.checkpoint().max, nil
102110
}
103111

104112
// Checkpoint saves the current state and resets the current state to
105-
// the empty set. Since no locks are taken, there is a chance that
106-
// the independent Min, Max, Sum, and Count are not consistent with each
107-
// other.
113+
// the empty set.
108114
func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
109-
// N.B. There is no atomic operation that can update all three
110-
// values at once without a memory allocation.
111-
//
112-
// This aggregator is intended to trade this correctness for
113-
// speed.
114-
//
115-
// Therefore, atomically swap fields independently, knowing
116-
// that individually the three parts of this aggregation could
117-
// be spread across multiple collections in rare cases.
118-
119-
c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
120-
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))
121-
c.checkpoint.max = c.current.max.SwapNumberAtomic(c.kind.Minimum())
122-
c.checkpoint.min = c.current.min.SwapNumberAtomic(c.kind.Maximum())
115+
c.lock.SwapActiveState(c.resetCheckpoint)
116+
}
117+
118+
// checkpoint returns the "cold" state, i.e. state collected prior to the
119+
// most recent Checkpoint() call
120+
func (c *Aggregator) checkpoint() *state {
121+
return &c.states[c.lock.ColdIdx()]
122+
}
123+
124+
func (c *Aggregator) resetCheckpoint() {
125+
checkpoint := c.checkpoint()
126+
127+
checkpoint.count.SetUint64(0)
128+
checkpoint.sum.SetNumber(c.kind.Zero())
129+
checkpoint.min.SetNumber(c.kind.Maximum())
130+
checkpoint.max.SetNumber(c.kind.Minimum())
123131
}
124132

125133
// Update adds the recorded measurement to the current data set.
126134
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error {
127135
kind := desc.NumberKind()
128136

129-
c.current.count.AddUint64Atomic(1)
130-
c.current.sum.AddNumberAtomic(kind, number)
137+
cIdx := c.lock.Start()
138+
defer c.lock.End(cIdx)
139+
140+
current := &c.states[cIdx]
141+
current.count.AddUint64Atomic(1)
142+
current.sum.AddNumberAtomic(kind, number)
131143

132144
for {
133-
current := c.current.min.AsNumberAtomic()
145+
cmin := current.min.AsNumberAtomic()
134146

135-
if number.CompareNumber(kind, current) >= 0 {
147+
if number.CompareNumber(kind, cmin) >= 0 {
136148
break
137149
}
138-
if c.current.min.CompareAndSwapNumber(current, number) {
150+
if current.min.CompareAndSwapNumber(cmin, number) {
139151
break
140152
}
141153
}
142154
for {
143-
current := c.current.max.AsNumberAtomic()
155+
cmax := current.max.AsNumberAtomic()
144156

145-
if number.CompareNumber(kind, current) <= 0 {
157+
if number.CompareNumber(kind, cmax) <= 0 {
146158
break
147159
}
148-
if c.current.max.CompareAndSwapNumber(current, number) {
160+
if current.max.CompareAndSwapNumber(cmax, number) {
149161
break
150162
}
151163
}
@@ -159,14 +171,22 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error
159171
return aggregator.NewInconsistentMergeError(c, oa)
160172
}
161173

162-
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
163-
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)
174+
// Lock() synchronizes Merge() and Checkpoint() to ensure all operations of
175+
// Merge() are performed on the same state.
176+
c.lock.Lock()
177+
defer c.lock.Unlock()
178+
179+
current := c.checkpoint()
180+
ocheckpoint := o.checkpoint()
181+
182+
current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count)
183+
current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum)
164184

165-
if c.checkpoint.min.CompareNumber(desc.NumberKind(), o.checkpoint.min) > 0 {
166-
c.checkpoint.min.SetNumber(o.checkpoint.min)
185+
if current.min.CompareNumber(desc.NumberKind(), ocheckpoint.min) > 0 {
186+
current.min.SetNumber(ocheckpoint.min)
167187
}
168-
if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
169-
c.checkpoint.max.SetNumber(o.checkpoint.max)
188+
if current.max.CompareNumber(desc.NumberKind(), ocheckpoint.max) < 0 {
189+
current.max.SetNumber(ocheckpoint.max)
170190
}
171191
return nil
172192
}

sdk/metric/aggregator/minmaxsumcount/mmsc_test.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,8 @@ var (
6666
func TestMain(m *testing.M) {
6767
fields := []ottest.FieldOffset{
6868
{
69-
Name: "Aggregator.current",
70-
Offset: unsafe.Offsetof(Aggregator{}.current),
71-
},
72-
{
73-
Name: "Aggregator.checkpoint",
74-
Offset: unsafe.Offsetof(Aggregator{}.checkpoint),
69+
Name: "Aggregator.states",
70+
Offset: unsafe.Offsetof(Aggregator{}.states),
7571
},
7672
{
7773
Name: "state.count",
+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright 2020, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// This test is too large for the race detector. This SDK uses no locks
16+
// that the race detector would help with, anyway.
17+
// +build !race
18+
19+
package metric_test
20+
21+
import (
22+
"context"
23+
"math/rand"
24+
"testing"
25+
"time"
26+
27+
"go.opentelemetry.io/otel/api/core"
28+
"go.opentelemetry.io/otel/sdk/export/metric"
29+
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
30+
)
31+
32+
func TestStressInt64MinMaxSumCount(t *testing.T) {
33+
desc := metric.NewDescriptor("some_metric", metric.MeasureKind, nil, "", "", core.Int64NumberKind)
34+
mmsc := minmaxsumcount.New(desc)
35+
36+
ctx, cancel := context.WithCancel(context.Background())
37+
defer cancel()
38+
go func() {
39+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
40+
v := rnd.Int63() % 103
41+
for {
42+
select {
43+
case <-ctx.Done():
44+
return
45+
default:
46+
_ = mmsc.Update(ctx, core.NewInt64Number(v), desc)
47+
}
48+
v++
49+
}
50+
}()
51+
52+
startTime := time.Now()
53+
for time.Since(startTime) < time.Second {
54+
mmsc.Checkpoint(context.Background(), desc)
55+
56+
s, _ := mmsc.Sum()
57+
c, _ := mmsc.Count()
58+
min, e1 := mmsc.Min()
59+
max, e2 := mmsc.Max()
60+
if c == 0 && (e1 == nil || e2 == nil || s.AsInt64() != 0) {
61+
t.Fail()
62+
}
63+
if c != 0 {
64+
if e1 != nil || e2 != nil {
65+
t.Fail()
66+
}
67+
lo, hi, sum := min.AsInt64(), max.AsInt64(), s.AsInt64()
68+
69+
if hi-lo+1 != c {
70+
t.Fail()
71+
}
72+
if c == 1 {
73+
if lo != hi || lo != sum {
74+
t.Fail()
75+
}
76+
} else {
77+
if hi*(hi+1)/2-(lo-1)*lo/2 != sum {
78+
t.Fail()
79+
}
80+
}
81+
}
82+
}
83+
}

sdk/metric/stress_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019, OpenTelemetry Authors
1+
// Copyright 2020, OpenTelemetry Authors
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)