Skip to content

Commit 0869be7

Browse files
committed
Prevent failed ingestion from affecting rate limiting in distributor.
Change the handling of the rate limiting in the distributor such that when the ingestion fails, the reservation in the rate limiter is canceled, returning the tokens for future use in the proceeding requests. Signed-off-by: Steve Simpson <[email protected]>
1 parent 92e9047 commit 0869be7

File tree

5 files changed

+117
-20
lines changed

5 files changed

+117
-20
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* `cortex_ha_tracker_replicas_cleanup_deleted_total`
5151
* `cortex_ha_tracker_replicas_cleanup_delete_failed_total`
5252
* [ENHANCEMENT] Tenant deletion endpoints now support deletion of ruler groups. This only works when using rule store that supports deletion. #3750
53+
* [ENHANCEMENT] Distributor: Prevent failed ingestion from affecting rate limiting. #3825
5354
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
5455
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
5556
* [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761

pkg/distributor/distributor.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,8 @@ func (d *Distributor) Push(ctx context.Context, req *ingester_client.WriteReques
580580
}
581581

582582
totalN := validatedSamples + len(validatedMetadata)
583-
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
583+
rateReservation := d.ingestionRateLimiter.AllowN(now, userID, totalN)
584+
if !rateReservation.OK() {
584585
// Ensure the request slice is reused if the request is rate limited.
585586
ingester_client.ReuseSlice(req.Timeseries)
586587

@@ -632,6 +633,8 @@ func (d *Distributor) Push(ctx context.Context, req *ingester_client.WriteReques
632633
return d.send(localCtx, ingester, timeseries, metadata, req.Source)
633634
}, func() { ingester_client.ReuseSlice(req.Timeseries) })
634635
if err != nil {
636+
// Ingestion failed, so roll-back the reservation from the rate limiter.
637+
rateReservation.CancelAt(now)
635638
return nil, err
636639
}
637640
return &ingester_client.WriteResponse{}, firstPartialErr

pkg/distributor/distributor_test.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
369369
ingestionRateStrategy string
370370
ingestionRate float64
371371
ingestionBurstSize int
372+
ingestionFailing bool
372373
pushes []testPush
373374
}{
374375
"local strategy: limit should be set to each distributor": {
@@ -413,6 +414,17 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
413414
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
414415
},
415416
},
417+
"unhappy ingestors: rate limit should be unaffected when ingestion fails": {
418+
distributors: 1,
419+
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
420+
ingestionRate: 10,
421+
ingestionBurstSize: 10,
422+
ingestionFailing: true,
423+
pushes: []testPush{
424+
{samples: 10, expectedError: errFail},
425+
{samples: 10, expectedError: errFail},
426+
},
427+
},
416428
}
417429

418430
for testName, testData := range tests {
@@ -425,10 +437,15 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
425437
limits.IngestionRate = testData.ingestionRate
426438
limits.IngestionBurstSize = testData.ingestionBurstSize
427439

440+
happyIngesters := 3
441+
if testData.ingestionFailing {
442+
happyIngesters = 0
443+
}
444+
428445
// Start all expected distributors
429446
distributors, _, r := prepare(t, prepConfig{
430447
numIngesters: 3,
431-
happyIngesters: 3,
448+
happyIngesters: happyIngesters,
432449
numDistributors: testData.distributors,
433450
shardByAllLabels: true,
434451
limits: limits,

pkg/util/limiter/rate_limiter.go

+43-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ type RateLimiter struct {
2626
tenants map[string]*tenantLimiter
2727
}
2828

29+
// Reservation encapsulates rate.Reservation to exclude interfaces which do not
30+
// make sense to expose, because we are following the semantics of AllowN, being
31+
// an immediate reservation, i.e. not delayed into the future.
32+
type Reservation struct {
33+
ok bool
34+
reservation *rate.Reservation
35+
}
36+
2937
type tenantLimiter struct {
3038
limiter *rate.Limiter
3139
recheckAt time.Time
@@ -42,9 +50,29 @@ func NewRateLimiter(strategy RateLimiterStrategy, recheckPeriod time.Duration) *
4250
}
4351
}
4452

45-
// AllowN reports whether n tokens may be consumed happen at time now.
46-
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) bool {
47-
return l.getTenantLimiter(now, tenantID).AllowN(now, n)
53+
// AllowN reports whether n tokens may be consumed happen at time now. The
54+
// reservation of tokens can be canceled using CancelAt on the returned object.
55+
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) *Reservation {
56+
57+
// Using ReserveN allows cancalation of the reservation, but
58+
// the semantics are subtly different to AllowN.
59+
r := l.getTenantLimiter(now, tenantID).ReserveN(now, n)
60+
if !r.OK() {
61+
return &Reservation{false, r}
62+
}
63+
64+
// ReserveN will still return OK if the necessary tokens are
65+
// available in the future, and tells us this time delay. In
66+
// order to mimic the semantics of AllowN, we must check that
67+
// there is no delay before we can use them.
68+
if r.DelayFrom(now) > time.Duration(0) {
69+
// Having decided not to use the reservation, return the
70+
// tokens to the rate limiter.
71+
r.CancelAt(now)
72+
return &Reservation{false, r}
73+
}
74+
75+
return &Reservation{true, r}
4876
}
4977

5078
// Limit returns the currently configured maximum overall tokens rate.
@@ -120,3 +148,15 @@ func (l *RateLimiter) recheckTenantLimiter(now time.Time, tenantID string) *rate
120148

121149
return entry.limiter
122150
}
151+
152+
// OK indicates whether the request to rate limiter was allowed.
153+
func (a *Reservation) OK() bool {
154+
return a.ok
155+
}
156+
157+
// Cancel returns the reservation to the rate limiter for use by other requests.
158+
// Note that typically the reservation should be canceled with the same timestamp
159+
// it was requested with, or not all the tokens consumed will be returned.
160+
func (a *Reservation) CancelAt(now time.Time) {
161+
a.reservation.CancelAt(now)
162+
}

pkg/util/limiter/rate_limiter_test.go

+51-15
Original file line numberDiff line numberDiff line change
@@ -45,24 +45,60 @@ func TestRateLimiter_AllowN(t *testing.T) {
4545
now := time.Now()
4646

4747
// Tenant #1
48-
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 8))
49-
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 10))
50-
assert.Equal(t, false, limiter.AllowN(now, "tenant-1", 3))
51-
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 2))
48+
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 8).OK())
49+
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 10).OK())
50+
assert.Equal(t, false, limiter.AllowN(now, "tenant-1", 3).OK())
51+
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 2).OK())
5252

53-
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 8))
54-
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-1", 3))
55-
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 2))
53+
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 8).OK())
54+
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-1", 3).OK())
55+
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 2).OK())
5656

5757
// Tenant #2
58-
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 18))
59-
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 20))
60-
assert.Equal(t, false, limiter.AllowN(now, "tenant-2", 3))
61-
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 2))
62-
63-
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 18))
64-
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-2", 3))
65-
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 2))
58+
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 18).OK())
59+
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 20).OK())
60+
assert.Equal(t, false, limiter.AllowN(now, "tenant-2", 3).OK())
61+
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 2).OK())
62+
63+
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 18).OK())
64+
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-2", 3).OK())
65+
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 2).OK())
66+
}
67+
68+
func TestRateLimiter_AllowNCancelation(t *testing.T) {
69+
strategy := &staticLimitStrategy{tenants: map[string]struct {
70+
limit float64
71+
burst int
72+
}{
73+
"tenant-1": {limit: 10, burst: 20},
74+
}}
75+
76+
limiter := NewRateLimiter(strategy, 10*time.Second)
77+
now := time.Now()
78+
79+
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 12).OK())
80+
assert.Equal(t, false, limiter.AllowN(now, "tenant-1", 9).OK())
81+
82+
r1 := limiter.AllowN(now, "tenant-1", 8)
83+
assert.Equal(t, true, r1.OK())
84+
r1.CancelAt(now)
85+
86+
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 8).OK())
87+
88+
// +10 tokens (1s)
89+
nowPlus := now.Add(time.Second)
90+
91+
assert.Equal(t, true, limiter.AllowN(nowPlus, "tenant-1", 6).OK())
92+
assert.Equal(t, false, limiter.AllowN(nowPlus, "tenant-1", 5).OK())
93+
94+
r2 := limiter.AllowN(nowPlus, "tenant-1", 4)
95+
assert.Equal(t, true, r2.OK())
96+
r2.CancelAt(nowPlus)
97+
98+
assert.Equal(t, true, limiter.AllowN(nowPlus, "tenant-1", 2).OK())
99+
assert.Equal(t, false, limiter.AllowN(nowPlus, "tenant-1", 3).OK())
100+
assert.Equal(t, true, limiter.AllowN(nowPlus, "tenant-1", 2).OK())
101+
assert.Equal(t, false, limiter.AllowN(nowPlus, "tenant-1", 1).OK())
66102
}
67103

68104
func BenchmarkRateLimiter_CustomMultiTenant(b *testing.B) {

0 commit comments

Comments
 (0)