-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathratecounter.go
126 lines (107 loc) · 3.13 KB
/
ratecounter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package ratecounter
import (
"strconv"
"sync"
"sync/atomic"
"time"
)
// A RateCounter is a thread-safe counter which returns the number of times
// 'Incr' has been called in the last interval
type RateCounter struct {
counter Counter
interval time.Duration
resolution int
partials []Counter
current int32
running int32
onStop func(r *RateCounter)
onStopLock sync.RWMutex
}
// NewRateCounter Constructs a new RateCounter, for the interval provided
func NewRateCounter(intrvl time.Duration) *RateCounter {
ratecounter := &RateCounter{
interval: intrvl,
running: 0,
}
return ratecounter.WithResolution(20)
}
// NewRateCounterWithResolution Constructs a new RateCounter, for the provided interval and resolution
func NewRateCounterWithResolution(intrvl time.Duration, resolution int) *RateCounter {
ratecounter := &RateCounter{
interval: intrvl,
running: 0,
}
return ratecounter.WithResolution(resolution)
}
// WithResolution determines the minimum resolution of this counter, default is 20
func (r *RateCounter) WithResolution(resolution int) *RateCounter {
if resolution < 1 {
panic("RateCounter resolution cannot be less than 1")
}
r.resolution = resolution
r.partials = make([]Counter, resolution)
r.current = 0
return r
}
// OnStop allow to specify a function that will be called each time the counter
// reaches 0. Useful for removing it.
func (r *RateCounter) OnStop(f func(*RateCounter)) {
r.onStopLock.Lock()
r.onStop = f
r.onStopLock.Unlock()
}
func (r *RateCounter) run() {
if ok := atomic.CompareAndSwapInt32(&r.running, 0, 1); !ok {
return
}
go func() {
ticker := time.NewTicker(time.Duration(float64(r.interval) / float64(r.resolution)))
for range ticker.C {
current := atomic.LoadInt32(&r.current)
next := (int(current) + 1) % r.resolution
r.counter.Incr(-1 * r.partials[next].Value())
r.partials[next].Reset()
atomic.CompareAndSwapInt32(&r.current, current, int32(next))
if r.counter.Value() == 0 {
atomic.StoreInt32(&r.running, 0)
ticker.Stop()
r.onStopLock.RLock()
if r.onStop != nil {
r.onStop(r)
}
r.onStopLock.RUnlock()
return
}
}
}()
}
// Incr Add an event into the RateCounter
func (r *RateCounter) Incr(val int64) {
r.counter.Incr(val)
r.partials[atomic.LoadInt32(&r.current)].Incr(val)
r.run()
}
// Rate Return the current number of events in the last interval
func (r *RateCounter) Rate() int64 {
return r.counter.Value()
}
// MaxRate counts the maximum instantaneous change in rate.
//
// This is useful to calculate number of events in last period without
// "averaging" effect. i.e. currently if counter is set for 30 seconds
// duration, and events fire 10 times per second, it'll take 30 seconds for
// "Rate" to show 300 (or 10 per second). The "MaxRate" will show 10
// immediately, and it'll stay this way for the next 30 seconds, even if rate
// drops below it.
func (r *RateCounter) MaxRate() int64 {
max := int64(0)
for i := 0; i < r.resolution; i++ {
if value := r.partials[i].Value(); max < value {
max = value
}
}
return max
}
func (r *RateCounter) String() string {
return strconv.FormatInt(r.counter.Value(), 10)
}