Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent failed ingestion from affecting rate limiting in distributor. #3825

Merged
merged 6 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* `cortex_ha_tracker_replicas_cleanup_deleted_total`
* `cortex_ha_tracker_replicas_cleanup_delete_failed_total`
* [ENHANCEMENT] Tenant deletion endpoints now support deletion of ruler groups. This only works when using rule store that supports deletion. #3750
* [ENHANCEMENT] Distributor: Prevent failed ingestion from affecting rate limiting. #3825
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
* [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761
Expand Down
5 changes: 4 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,8 @@ func (d *Distributor) Push(ctx context.Context, req *ingester_client.WriteReques
}

totalN := validatedSamples + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
rateOK, rateReservation := d.ingestionRateLimiter.AllowN(now, userID, totalN)
if !rateOK {
// Ensure the request slice is reused if the request is rate limited.
ingester_client.ReuseSlice(req.Timeseries)

Expand Down Expand Up @@ -632,6 +633,8 @@ func (d *Distributor) Push(ctx context.Context, req *ingester_client.WriteReques
return d.send(localCtx, ingester, timeseries, metadata, req.Source)
}, func() { ingester_client.ReuseSlice(req.Timeseries) })
if err != nil {
// Ingestion failed, so roll-back the reservation from the rate limiter.
rateReservation.CancelAt(now)
return nil, err
}
return &ingester_client.WriteResponse{}, firstPartialErr
Expand Down
19 changes: 18 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionRateStrategy string
ingestionRate float64
ingestionBurstSize int
ingestionFailing bool
pushes []testPush
}{
"local strategy: limit should be set to each distributor": {
Expand Down Expand Up @@ -413,6 +414,17 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
},
},
"unhappy ingesters: rate limit should be unaffected when ingestion fails": {
distributors: 1,
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
ingestionRate: 10,
ingestionBurstSize: 10,
ingestionFailing: true,
pushes: []testPush{
{samples: 10, expectedError: errFail},
{samples: 10, expectedError: errFail},
},
},
}

for testName, testData := range tests {
Expand All @@ -425,10 +437,15 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
limits.IngestionRate = testData.ingestionRate
limits.IngestionBurstSize = testData.ingestionBurstSize

happyIngesters := 3
if testData.ingestionFailing {
happyIngesters = 0
}

// Start all expected distributors
distributors, _, r := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
happyIngesters: happyIngesters,
numDistributors: testData.distributors,
shardByAllLabels: true,
limits: limits,
Expand Down
37 changes: 34 additions & 3 deletions pkg/util/limiter/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ type RateLimiter struct {
tenants map[string]*tenantLimiter
}

// Reservation is similar to rate.Reservation but excludes interfaces which do
// not make sense to expose, because we are following the semantics of AllowN,
// being an immediate reservation, i.e. not delayed into the future.
type Reservation interface {
// CancelAt returns the reservation to the rate limiter for use by other
// requests. Note that typically the reservation should be canceled with
// the same timestamp it was requested with, or not all the tokens
// consumed will be returned.
CancelAt(now time.Time)
}

type tenantLimiter struct {
limiter *rate.Limiter
recheckAt time.Time
Expand All @@ -42,9 +53,29 @@ func NewRateLimiter(strategy RateLimiterStrategy, recheckPeriod time.Duration) *
}
}

// AllowN reports whether n tokens may be consumed happen at time now.
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) bool {
return l.getTenantLimiter(now, tenantID).AllowN(now, n)
// AllowN reports whether n tokens may be consumed happen at time now. The
// reservation of tokens can be canceled using CancelAt on the returned object.
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) (bool, Reservation) {

// Using ReserveN allows cancellation of the reservation, but
// the semantics are subtly different to AllowN.
r := l.getTenantLimiter(now, tenantID).ReserveN(now, n)
if !r.OK() {
return false, nil
}

// ReserveN will still return OK if the necessary tokens are
// available in the future, and tells us this time delay. In
// order to mimic the semantics of AllowN, we must check that
// there is no delay before we can use them.
if r.DelayFrom(now) > 0 {
// Having decided not to use the reservation, return the
// tokens to the rate limiter.
r.CancelAt(now)
return false, nil
}

return true, r
}

// Limit returns the currently configured maximum overall tokens rate.
Expand Down
70 changes: 55 additions & 15 deletions pkg/util/limiter/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,60 @@ func TestRateLimiter_AllowN(t *testing.T) {
now := time.Now()

// Tenant #1
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 8))
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 10))
assert.Equal(t, false, limiter.AllowN(now, "tenant-1", 3))
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 2))
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 8)))
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 10)))
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-1", 3)))
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 2)))

assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 8))
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-1", 3))
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 2))
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 8)))
assert.Equal(t, false, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 3)))
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 2)))

// Tenant #2
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 18))
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 20))
assert.Equal(t, false, limiter.AllowN(now, "tenant-2", 3))
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 2))

assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 18))
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-2", 3))
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 2))
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 18)))
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 20)))
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-2", 3)))
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 2)))

assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 18)))
assert.Equal(t, false, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 3)))
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 2)))
}

func TestRateLimiter_AllowNCancelation(t *testing.T) {
strategy := &staticLimitStrategy{tenants: map[string]struct {
limit float64
burst int
}{
"tenant-1": {limit: 10, burst: 20},
}}

limiter := NewRateLimiter(strategy, 10*time.Second)
now := time.Now()

assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 12)))
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-1", 9)))

ok1, r1 := limiter.AllowN(now, "tenant-1", 8)
assert.Equal(t, true, ok1)
r1.CancelAt(now)

assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 8)))

// +10 tokens (1s)
nowPlus := now.Add(time.Second)

assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 6)))
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 5)))

ok2, r2 := limiter.AllowN(nowPlus, "tenant-1", 4)
assert.Equal(t, true, ok2)
r2.CancelAt(nowPlus)

assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 2)))
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 3)))
assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 2)))
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 1)))
}

func BenchmarkRateLimiter_CustomMultiTenant(b *testing.B) {
Expand Down Expand Up @@ -127,3 +163,7 @@ func (s *staticLimitStrategy) Burst(tenantID string) int {

return tenant.burst
}

func isOK(ok bool, r Reservation) bool {
return ok
}