Skip to content

Commit 9ba848b

Browse files
authored
Prevent failed ingestion from affecting rate limiting in distributor. (cortexproject#3825)
* Prevent failed ingestion from affecting rate limiting in distributor. Change the handling of the rate limiting in the distributor such that when the ingestion fails, the reservation in the rate limiter is canceled, returning the tokens for future use in the proceeding requests. Signed-off-by: Steve Simpson <[email protected]> * Spelling and typos. Signed-off-by: Steve Simpson <[email protected]> * Change Reservation to an interface to avoid extra allocation. Signed-off-by: Steve Simpson <[email protected]> * Return nil on failure in AllowN. Signed-off-by: Steve Simpson <[email protected]> * Rename unit test helper OK to isOK. Signed-off-by: Steve Simpson <[email protected]> * Review suggestions. Signed-off-by: Steve Simpson <[email protected]>
1 parent e10ccda commit 9ba848b

File tree

5 files changed

+112
-20
lines changed

5 files changed

+112
-20
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
* `cortex_ha_tracker_replicas_cleanup_deleted_total`
5353
* `cortex_ha_tracker_replicas_cleanup_delete_failed_total`
5454
* [ENHANCEMENT] Tenant deletion endpoints now support deletion of ruler groups. This only works when using rule store that supports deletion. #3750
55+
* [ENHANCEMENT] Distributor: Prevent failed ingestion from affecting rate limiting. #3825
5556
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
5657
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
5758
* [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761

pkg/distributor/distributor.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,8 @@ func (d *Distributor) Push(ctx context.Context, req *ingester_client.WriteReques
580580
}
581581

582582
totalN := validatedSamples + len(validatedMetadata)
583-
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
583+
rateOK, rateReservation := d.ingestionRateLimiter.AllowN(now, userID, totalN)
584+
if !rateOK {
584585
// Ensure the request slice is reused if the request is rate limited.
585586
ingester_client.ReuseSlice(req.Timeseries)
586587

@@ -632,6 +633,8 @@ func (d *Distributor) Push(ctx context.Context, req *ingester_client.WriteReques
632633
return d.send(localCtx, ingester, timeseries, metadata, req.Source)
633634
}, func() { ingester_client.ReuseSlice(req.Timeseries) })
634635
if err != nil {
636+
// Ingestion failed, so roll-back the reservation from the rate limiter.
637+
rateReservation.CancelAt(now)
635638
return nil, err
636639
}
637640
return &ingester_client.WriteResponse{}, firstPartialErr

pkg/distributor/distributor_test.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
369369
ingestionRateStrategy string
370370
ingestionRate float64
371371
ingestionBurstSize int
372+
ingestionFailing bool
372373
pushes []testPush
373374
}{
374375
"local strategy: limit should be set to each distributor": {
@@ -413,6 +414,17 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
413414
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
414415
},
415416
},
417+
"unhappy ingesters: rate limit should be unaffected when ingestion fails": {
418+
distributors: 1,
419+
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
420+
ingestionRate: 10,
421+
ingestionBurstSize: 10,
422+
ingestionFailing: true,
423+
pushes: []testPush{
424+
{samples: 10, expectedError: errFail},
425+
{samples: 10, expectedError: errFail},
426+
},
427+
},
416428
}
417429

418430
for testName, testData := range tests {
@@ -425,10 +437,15 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
425437
limits.IngestionRate = testData.ingestionRate
426438
limits.IngestionBurstSize = testData.ingestionBurstSize
427439

440+
happyIngesters := 3
441+
if testData.ingestionFailing {
442+
happyIngesters = 0
443+
}
444+
428445
// Start all expected distributors
429446
distributors, _, r := prepare(t, prepConfig{
430447
numIngesters: 3,
431-
happyIngesters: 3,
448+
happyIngesters: happyIngesters,
432449
numDistributors: testData.distributors,
433450
shardByAllLabels: true,
434451
limits: limits,

pkg/util/limiter/rate_limiter.go

+34-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ type RateLimiter struct {
2626
tenants map[string]*tenantLimiter
2727
}
2828

29+
// Reservation is similar to rate.Reservation but excludes interfaces which do
30+
// not make sense to expose, because we are following the semantics of AllowN,
31+
// being an immediate reservation, i.e. not delayed into the future.
32+
type Reservation interface {
33+
// CancelAt returns the reservation to the rate limiter for use by other
34+
// requests. Note that typically the reservation should be canceled with
35+
// the same timestamp it was requested with, or not all the tokens
36+
// consumed will be returned.
37+
CancelAt(now time.Time)
38+
}
39+
2940
type tenantLimiter struct {
3041
limiter *rate.Limiter
3142
recheckAt time.Time
@@ -42,9 +53,29 @@ func NewRateLimiter(strategy RateLimiterStrategy, recheckPeriod time.Duration) *
4253
}
4354
}
4455

45-
// AllowN reports whether n tokens may be consumed happen at time now.
46-
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) bool {
47-
return l.getTenantLimiter(now, tenantID).AllowN(now, n)
56+
// AllowN reports whether n tokens may be consumed happen at time now. The
57+
// reservation of tokens can be canceled using CancelAt on the returned object.
58+
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) (bool, Reservation) {
59+
60+
// Using ReserveN allows cancellation of the reservation, but
61+
// the semantics are subtly different to AllowN.
62+
r := l.getTenantLimiter(now, tenantID).ReserveN(now, n)
63+
if !r.OK() {
64+
return false, nil
65+
}
66+
67+
// ReserveN will still return OK if the necessary tokens are
68+
// available in the future, and tells us this time delay. In
69+
// order to mimic the semantics of AllowN, we must check that
70+
// there is no delay before we can use them.
71+
if r.DelayFrom(now) > 0 {
72+
// Having decided not to use the reservation, return the
73+
// tokens to the rate limiter.
74+
r.CancelAt(now)
75+
return false, nil
76+
}
77+
78+
return true, r
4879
}
4980

5081
// Limit returns the currently configured maximum overall tokens rate.

pkg/util/limiter/rate_limiter_test.go

+55-15
Original file line numberDiff line numberDiff line change
@@ -45,24 +45,60 @@ func TestRateLimiter_AllowN(t *testing.T) {
4545
now := time.Now()
4646

4747
// Tenant #1
48-
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 8))
49-
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 10))
50-
assert.Equal(t, false, limiter.AllowN(now, "tenant-1", 3))
51-
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 2))
48+
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 8)))
49+
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 10)))
50+
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-1", 3)))
51+
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 2)))
5252

53-
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 8))
54-
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-1", 3))
55-
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 2))
53+
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 8)))
54+
assert.Equal(t, false, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 3)))
55+
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 2)))
5656

5757
// Tenant #2
58-
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 18))
59-
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 20))
60-
assert.Equal(t, false, limiter.AllowN(now, "tenant-2", 3))
61-
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 2))
62-
63-
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 18))
64-
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-2", 3))
65-
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 2))
58+
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 18)))
59+
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 20)))
60+
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-2", 3)))
61+
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 2)))
62+
63+
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 18)))
64+
assert.Equal(t, false, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 3)))
65+
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 2)))
66+
}
67+
68+
func TestRateLimiter_AllowNCancelation(t *testing.T) {
69+
strategy := &staticLimitStrategy{tenants: map[string]struct {
70+
limit float64
71+
burst int
72+
}{
73+
"tenant-1": {limit: 10, burst: 20},
74+
}}
75+
76+
limiter := NewRateLimiter(strategy, 10*time.Second)
77+
now := time.Now()
78+
79+
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 12)))
80+
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-1", 9)))
81+
82+
ok1, r1 := limiter.AllowN(now, "tenant-1", 8)
83+
assert.Equal(t, true, ok1)
84+
r1.CancelAt(now)
85+
86+
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 8)))
87+
88+
// +10 tokens (1s)
89+
nowPlus := now.Add(time.Second)
90+
91+
assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 6)))
92+
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 5)))
93+
94+
ok2, r2 := limiter.AllowN(nowPlus, "tenant-1", 4)
95+
assert.Equal(t, true, ok2)
96+
r2.CancelAt(nowPlus)
97+
98+
assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 2)))
99+
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 3)))
100+
assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 2)))
101+
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 1)))
66102
}
67103

68104
func BenchmarkRateLimiter_CustomMultiTenant(b *testing.B) {
@@ -127,3 +163,7 @@ func (s *staticLimitStrategy) Burst(tenantID string) int {
127163

128164
return tenant.burst
129165
}
166+
167+
func isOK(ok bool, r Reservation) bool {
168+
return ok
169+
}

0 commit comments

Comments
 (0)