Skip to content

Commit 5b9ccb8

Browse files
hdenmreiferson
authored andcommitted
consumer: add BackoffStrategy w/ Exponential and Jittered strategies
1 parent f9995d9 commit 5b9ccb8

File tree

4 files changed

+122
-19
lines changed

4 files changed

+122
-19
lines changed

config.go

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ import (
77
"fmt"
88
"io/ioutil"
99
"log"
10+
"math"
11+
"math/rand"
1012
"net"
1113
"os"
1214
"reflect"
1315
"strconv"
1416
"strings"
17+
"sync"
1518
"time"
1619
"unsafe"
1720
)
@@ -27,6 +30,61 @@ type defaultsHandler interface {
2730
SetDefaults(c *Config) error
2831
}
2932

33+
// BackoffStrategy defines a strategy for calculating the duration of time
34+
// a consumer should backoff for a given attempt
35+
type BackoffStrategy interface {
36+
Calculate(attempt int) time.Duration
37+
}
38+
39+
// ExponentialStrategy implements an exponential backoff strategy (default)
40+
type ExponentialStrategy struct {
41+
cfg *Config
42+
}
43+
44+
// Calculate returns a duration of time: 2 ^ attempt
45+
func (s *ExponentialStrategy) Calculate(attempt int) time.Duration {
46+
backoffDuration := s.cfg.BackoffMultiplier *
47+
time.Duration(math.Pow(2, float64(attempt)))
48+
if backoffDuration > s.cfg.MaxBackoffDuration {
49+
backoffDuration = s.cfg.MaxBackoffDuration
50+
}
51+
return backoffDuration
52+
}
53+
54+
func (s *ExponentialStrategy) setConfig(cfg *Config) {
55+
s.cfg = cfg
56+
}
57+
58+
// FullJitterStrategy implements http://www.awsarchitectureblog.com/2015/03/backoff.html
59+
type FullJitterStrategy struct {
60+
cfg *Config
61+
62+
rngOnce sync.Once
63+
rng *rand.Rand
64+
}
65+
66+
// Calculate returns a random duration of time [0, 2 ^ attempt]
67+
func (s *FullJitterStrategy) Calculate(attempt int) time.Duration {
68+
// lazily initialize the RNG
69+
s.rngOnce.Do(func() {
70+
if s.rng != nil {
71+
return
72+
}
73+
s.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
74+
})
75+
76+
backoffDuration := s.cfg.BackoffMultiplier *
77+
time.Duration(math.Pow(2, float64(attempt)))
78+
if backoffDuration > s.cfg.MaxBackoffDuration {
79+
backoffDuration = s.cfg.MaxBackoffDuration
80+
}
81+
return time.Duration(s.rng.Intn(int(backoffDuration)))
82+
}
83+
84+
func (s *FullJitterStrategy) setConfig(cfg *Config) {
85+
s.cfg = cfg
86+
}
87+
3088
// Config is a struct of NSQ options
3189
//
3290
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
@@ -59,11 +117,17 @@ type Config struct {
59117
// Maximum duration when REQueueing (for doubling of deferred requeue)
60118
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
61119
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
120+
121+
// Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms.
122+
BackoffStrategy BackoffStrategy
123+
// Maximum amount of time to backoff when processing fails 0 == no backoff
124+
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
62125
// Unit of time for calculating consumer backoff
63126
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
64127

65128
// Maximum number of times this consumer will attempt to process a message before giving up
66129
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
130+
67131
// Amount of time in seconds to wait for a message from a producer when in a state where RDY
68132
// counts are re-distributed (ie. max_in_flight < num_producers)
69133
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
@@ -108,9 +172,6 @@ type Config struct {
108172
// Maximum number of messages to allow in flight (concurrency knob)
109173
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`
110174

111-
// Maximum amount of time to backoff when processing fails 0 == no backoff
112-
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
113-
114175
// The server-side message timeout for messages delivered to this client
115176
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
116177

@@ -122,9 +183,10 @@ type Config struct {
122183
//
123184
// This must be used to initialize Config structs. Values can be set directly, or through Config.Set()
124185
func NewConfig() *Config {
125-
c := &Config{}
126-
c.configHandlers = append(c.configHandlers, &structTagsConfig{}, &tlsConfig{})
127-
c.initialized = true
186+
c := &Config{
187+
configHandlers: []configHandler{&structTagsConfig{}, &tlsConfig{}},
188+
initialized: true,
189+
}
128190
if err := c.setDefaults(); err != nil {
129191
panic(err.Error())
130192
}
@@ -170,7 +232,6 @@ func (c *Config) assertInitialized() {
170232
// Validate checks that all values are within specified min/max ranges
171233
func (c *Config) Validate() error {
172234
c.assertInitialized()
173-
174235
for _, h := range c.configHandlers {
175236
if err := h.Validate(c); err != nil {
176237
return err
@@ -188,7 +249,6 @@ func (c *Config) setDefaults() error {
188249
}
189250
}
190251
}
191-
192252
return nil
193253
}
194254

@@ -271,6 +331,7 @@ func (h *structTagsConfig) SetDefaults(c *Config) error {
271331
log.Fatalf("ERROR: unable to get hostname %s", err.Error())
272332
}
273333

334+
c.BackoffStrategy = &ExponentialStrategy{}
274335
c.ClientID = strings.Split(hostname, ".")[0]
275336
c.Hostname = hostname
276337
c.UserAgent = fmt.Sprintf("go-nsq/%s", VERSION)
@@ -311,6 +372,18 @@ func (h *structTagsConfig) Validate(c *Config) error {
311372
if c.HeartbeatInterval > c.ReadTimeout {
312373
return fmt.Errorf("HeartbeatInterval %v must be less than ReadTimeout %v", c.HeartbeatInterval, c.ReadTimeout)
313374
}
375+
376+
if c.BackoffStrategy == nil {
377+
return fmt.Errorf("BackoffStrategy cannot be nil")
378+
}
379+
380+
// initialize internal backoff strategies that need access to config
381+
if v, ok := c.BackoffStrategy.(interface {
382+
setConfig(*Config)
383+
}); ok {
384+
v.setConfig(c)
385+
}
386+
314387
return nil
315388
}
316389

config_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package nsq
22

33
import (
4+
"math/rand"
45
"net"
56
"testing"
7+
"time"
68
)
79

810
func TestConfigSet(t *testing.T) {
@@ -53,3 +55,40 @@ func TestConfigValidate(t *testing.T) {
5355
t.Error("no error set for invalid value")
5456
}
5557
}
58+
59+
func TestExponentialBackoff(t *testing.T) {
60+
expected := []time.Duration{
61+
1 * time.Second,
62+
2 * time.Second,
63+
8 * time.Second,
64+
32 * time.Second,
65+
}
66+
backoffTest(t, expected, func(c *Config) BackoffStrategy {
67+
return &ExponentialStrategy{cfg: c}
68+
})
69+
}
70+
71+
func TestFullJitterBackoff(t *testing.T) {
72+
expected := []time.Duration{
73+
566028617 * time.Nanosecond,
74+
1365407263 * time.Nanosecond,
75+
5232470547 * time.Nanosecond,
76+
21467499218 * time.Nanosecond,
77+
}
78+
backoffTest(t, expected, func(c *Config) BackoffStrategy {
79+
return &FullJitterStrategy{cfg: c, rng: rand.New(rand.NewSource(99))}
80+
})
81+
}
82+
83+
func backoffTest(t *testing.T, expected []time.Duration, cb func(c *Config) BackoffStrategy) {
84+
config := NewConfig()
85+
attempts := []int{0, 1, 3, 5}
86+
s := cb(config)
87+
for i := range attempts {
88+
result := s.Calculate(attempts[i])
89+
if result != expected[i] {
90+
t.Fatalf("wrong backoff duration %v for attempt %d (should be %v)",
91+
result, attempts[i], expected[i])
92+
}
93+
}
94+
}

consumer.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,7 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
676676
}
677677
} else if r.backoffCounter > 0 {
678678
// start or continue backoff
679-
backoffDuration := r.backoffDurationForCount(r.backoffCounter)
679+
backoffDuration := r.config.BackoffStrategy.Calculate(int(r.backoffCounter))
680680
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
681681
time.AfterFunc(backoffDuration, r.backoff)
682682

@@ -814,15 +814,6 @@ func (r *Consumer) onConnClose(c *Conn) {
814814
}
815815
}
816816

817-
func (r *Consumer) backoffDurationForCount(count int32) time.Duration {
818-
backoffDuration := r.config.BackoffMultiplier *
819-
time.Duration(math.Pow(2, float64(count)))
820-
if backoffDuration > r.config.MaxBackoffDuration {
821-
backoffDuration = r.config.MaxBackoffDuration
822-
}
823-
return backoffDuration
824-
}
825-
826817
func (r *Consumer) inBackoff() bool {
827818
r.backoffMtx.RLock()
828819
backoffCounter := r.backoffCounter

consumer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func TestConsumerTLSClientCertViaSet(t *testing.T) {
141141

142142
func consumerTest(t *testing.T, cb func(c *Config)) {
143143
config := NewConfig()
144-
laddr := "127.0.0.2"
144+
laddr := "127.0.0.1"
145145
// so that the test can simulate binding consumer to specified address
146146
config.LocalAddr, _ = net.ResolveTCPAddr("tcp", laddr+":0")
147147
// so that the test can simulate reaching max requeues and a call to LogFailedMessage

0 commit comments

Comments
 (0)