Skip to content

Commit 5e7a8b2

Browse files
committed
feat: add jitter and initial health check wait support to upstreams
Add two features: - Support for health check jitter: if we start many loadbalancers at once, they might do all of their healthchecks at the same time. Jitter helps us avoid this issue. The initial health check is always done immediately. - Ability to wait for the initial health check to complete: when we create an upstream list and immediately try to pick a healthy upstream from it, it, based on the initial scores, might return either no upstream or one of the down ones. This is because the health checks, which are done in a separate goroutine, didn't get the chance to complete yet. We can avoid this issue by waiting for the initial health check to be done before calling `.Pick()` + with using correct initial scores. Signed-off-by: Utku Ozdemir <[email protected]>
1 parent 589c33a commit 5e7a8b2

File tree

2 files changed

+75
-22
lines changed

2 files changed

+75
-22
lines changed

upstream/upstream.go

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"errors"
1111
"fmt"
1212
"iter"
13+
"math/rand/v2"
1314
"slices"
1415
"sync"
1516
"time"
@@ -94,6 +95,19 @@ func WithHealthcheckInterval(interval time.Duration) ListOption {
9495
}
9596
}
9697

98+
// WithHealthCheckJitter configures healthcheck jitter (0.0 - 1.0).
99+
func WithHealthCheckJitter(jitter float64) ListOption {
100+
return func(l *ListConfig) error {
101+
if jitter < 0 || jitter > 1 {
102+
return fmt.Errorf("healthcheck jitter should in range [0, 1]: %f", jitter)
103+
}
104+
105+
l.healthcheckJitter = jitter
106+
107+
return nil
108+
}
109+
}
110+
97111
// WithHealthcheckTimeout configures healthcheck timeout (for each backend).
98112
func WithHealthcheckTimeout(timeout time.Duration) ListOption {
99113
return func(l *ListConfig) error {
@@ -159,6 +173,9 @@ type ListConfig struct { //nolint:govet
159173
lowScore, highScore float64
160174
failScoreDelta, successScoreDelta float64
161175
initialScore float64
176+
healthcheckJitter float64
177+
178+
initialHealthcheckDoneCh chan struct{}
162179

163180
minTier, maxTier, initTier Tier
164181
}
@@ -191,6 +208,8 @@ func NewListWithCmp[T Backend](upstreams iter.Seq[T], cmp func(T, T) bool, optio
191208
minTier: 0,
192209
maxTier: 4,
193210
initTier: 0,
211+
212+
initialHealthcheckDoneCh: make(chan struct{}),
194213
},
195214

196215
cmp: cmp,
@@ -321,6 +340,16 @@ func (list *List[T]) downWithTier(upstream T, newTier Tier) {
321340
}
322341
}
323342

343+
// WaitForInitialHealthcheck waits for initial healthcheck to be completed.
344+
func (list *List[T]) WaitForInitialHealthcheck(ctx context.Context) error {
345+
select {
346+
case <-ctx.Done():
347+
return ctx.Err()
348+
case <-list.initialHealthcheckDoneCh:
349+
return nil
350+
}
351+
}
352+
324353
// Pick returns next backend to be used.
325354
//
326355
// Default policy is to pick healthy (non-negative score) backend in
@@ -351,35 +380,54 @@ func (list *List[T]) Pick() (T, error) { //nolint:ireturn
351380
func (list *List[T]) healthcheck(ctx context.Context) {
352381
defer list.healthWg.Done()
353382

354-
ticker := time.NewTicker(list.healthcheckInterval)
355-
defer ticker.Stop()
356-
357-
for {
358-
list.mu.Lock()
359-
backends := xslices.Map(list.nodes, func(n node[T]) T { return n.backend })
360-
list.mu.Unlock()
383+
// run the initial health check immediately
384+
list.doHealthCheck(ctx)
361385

362-
for _, backend := range backends {
363-
if ctx.Err() != nil {
364-
return
365-
}
386+
close(list.initialHealthcheckDoneCh)
366387

367-
func() {
368-
localCtx, ctxCancel := context.WithTimeout(ctx, list.healthcheckTimeout)
369-
defer ctxCancel()
388+
initialInterval := list.healthcheckInterval
389+
if list.healthcheckJitter > 0 {
390+
// jitter is enabled - stagger the second health check by setting the first wait time to a random duration between 0 and the full interval
391+
initialInterval = time.Duration(rand.Float64() * float64(list.healthcheckInterval))
392+
}
370393

371-
if newTier, err := backend.HealthCheck(localCtx); err != nil {
372-
list.downWithTier(backend, newTier)
373-
} else {
374-
list.upWithTier(backend, newTier)
375-
}
376-
}()
377-
}
394+
timer := time.NewTimer(initialInterval)
395+
defer timer.Stop()
378396

397+
for {
379398
select {
380399
case <-ctx.Done():
381400
return
382-
case <-ticker.C:
401+
case <-timer.C:
402+
}
403+
404+
list.doHealthCheck(ctx)
405+
406+
nextInterval := time.Duration(((rand.Float64()*2-1)*list.healthcheckJitter + 1.0) * float64(list.healthcheckInterval))
407+
408+
timer.Reset(nextInterval)
409+
}
410+
}
411+
412+
func (list *List[T]) doHealthCheck(ctx context.Context) {
413+
list.mu.Lock()
414+
backends := xslices.Map(list.nodes, func(n node[T]) T { return n.backend })
415+
list.mu.Unlock()
416+
417+
for _, backend := range backends {
418+
if ctx.Err() != nil {
419+
return
383420
}
421+
422+
func() {
423+
localCtx, ctxCancel := context.WithTimeout(ctx, list.healthcheckTimeout)
424+
defer ctxCancel()
425+
426+
if newTier, err := backend.HealthCheck(localCtx); err != nil {
427+
list.downWithTier(backend, newTier)
428+
} else {
429+
list.upWithTier(backend, newTier)
430+
}
431+
}()
384432
}
385433
}

upstream/upstream_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ func (suite *ListSuite) TestEmpty() {
4848

4949
defer l.Shutdown()
5050

51+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
52+
defer cancel()
53+
54+
suite.Require().NoError(l.WaitForInitialHealthcheck(ctx), "initial healthcheck should be done immediately")
55+
5156
backend, err := l.Pick()
5257
suite.Assert().Zero(backend)
5358
suite.Assert().EqualError(err, "no upstreams available")

0 commit comments

Comments
 (0)