Skip to content

Commit 4aa2783

Browse files
pstibranypracucci
andauthored
Prepare Cortex 1.8-rc.1 (#3953)
* Revert "Prevent failed ingestion from affecting rate limiting in distributor. (#3825)" (#3948) This reverts commit 9ba848b. Signed-off-by: Marco Pracucci <[email protected]> (cherry picked from commit 6aa2a69) * Update VERSION Signed-off-by: Peter Štibraný <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent da31295 commit 4aa2783

File tree

6 files changed

+25
-112
lines changed

6 files changed

+25
-112
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 1.8.0-rc.1 / 2021-03-15
4+
5+
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
6+
37
## 1.8.0-rc.0 / 2021-03-08
48

59
* [CHANGE] Alertmanager: Don't expose cluster information to tenants via the `/alertmanager/api/v1/status` API endpoint when operating with clustering enabled. #3903

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.8.0-rc.0
1+
1.8.0-rc.1

pkg/distributor/distributor.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -587,8 +587,7 @@ func (d *Distributor) Push(ctx context.Context, req *ingester_client.WriteReques
587587
}
588588

589589
totalN := validatedSamples + len(validatedMetadata)
590-
rateOK, rateReservation := d.ingestionRateLimiter.AllowN(now, userID, totalN)
591-
if !rateOK {
590+
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
592591
// Ensure the request slice is reused if the request is rate limited.
593592
ingester_client.ReuseSlice(req.Timeseries)
594593

@@ -640,8 +639,6 @@ func (d *Distributor) Push(ctx context.Context, req *ingester_client.WriteReques
640639
return d.send(localCtx, ingester, timeseries, metadata, req.Source)
641640
}, func() { ingester_client.ReuseSlice(req.Timeseries) })
642641
if err != nil {
643-
// Ingestion failed, so roll-back the reservation from the rate limiter.
644-
rateReservation.CancelAt(now)
645642
return nil, err
646643
}
647644
return &ingester_client.WriteResponse{}, firstPartialErr

pkg/distributor/distributor_test.go

+1-18
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,6 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
363363
ingestionRateStrategy string
364364
ingestionRate float64
365365
ingestionBurstSize int
366-
ingestionFailing bool
367366
pushes []testPush
368367
}{
369368
"local strategy: limit should be set to each distributor": {
@@ -408,17 +407,6 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
408407
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
409408
},
410409
},
411-
"unhappy ingesters: rate limit should be unaffected when ingestion fails": {
412-
distributors: 1,
413-
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
414-
ingestionRate: 10,
415-
ingestionBurstSize: 10,
416-
ingestionFailing: true,
417-
pushes: []testPush{
418-
{samples: 10, expectedError: errFail},
419-
{samples: 10, expectedError: errFail},
420-
},
421-
},
422410
}
423411

424412
for testName, testData := range tests {
@@ -431,15 +419,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
431419
limits.IngestionRate = testData.ingestionRate
432420
limits.IngestionBurstSize = testData.ingestionBurstSize
433421

434-
happyIngesters := 3
435-
if testData.ingestionFailing {
436-
happyIngesters = 0
437-
}
438-
439422
// Start all expected distributors
440423
distributors, _, r, _ := prepare(t, prepConfig{
441424
numIngesters: 3,
442-
happyIngesters: happyIngesters,
425+
happyIngesters: 3,
443426
numDistributors: testData.distributors,
444427
shardByAllLabels: true,
445428
limits: limits,

pkg/util/limiter/rate_limiter.go

+3-34
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,6 @@ type RateLimiter struct {
2626
tenants map[string]*tenantLimiter
2727
}
2828

29-
// Reservation is similar to rate.Reservation but excludes interfaces which do
30-
// not make sense to expose, because we are following the semantics of AllowN,
31-
// being an immediate reservation, i.e. not delayed into the future.
32-
type Reservation interface {
33-
// CancelAt returns the reservation to the rate limiter for use by other
34-
// requests. Note that typically the reservation should be canceled with
35-
// the same timestamp it was requested with, or not all the tokens
36-
// consumed will be returned.
37-
CancelAt(now time.Time)
38-
}
39-
4029
type tenantLimiter struct {
4130
limiter *rate.Limiter
4231
recheckAt time.Time
@@ -53,29 +42,9 @@ func NewRateLimiter(strategy RateLimiterStrategy, recheckPeriod time.Duration) *
5342
}
5443
}
5544

56-
// AllowN reports whether n tokens may be consumed happen at time now. The
57-
// reservation of tokens can be canceled using CancelAt on the returned object.
58-
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) (bool, Reservation) {
59-
60-
// Using ReserveN allows cancellation of the reservation, but
61-
// the semantics are subtly different to AllowN.
62-
r := l.getTenantLimiter(now, tenantID).ReserveN(now, n)
63-
if !r.OK() {
64-
return false, nil
65-
}
66-
67-
// ReserveN will still return OK if the necessary tokens are
68-
// available in the future, and tells us this time delay. In
69-
// order to mimic the semantics of AllowN, we must check that
70-
// there is no delay before we can use them.
71-
if r.DelayFrom(now) > 0 {
72-
// Having decided not to use the reservation, return the
73-
// tokens to the rate limiter.
74-
r.CancelAt(now)
75-
return false, nil
76-
}
77-
78-
return true, r
45+
// AllowN reports whether n tokens may be consumed happen at time now.
46+
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) bool {
47+
return l.getTenantLimiter(now, tenantID).AllowN(now, n)
7948
}
8049

8150
// Limit returns the currently configured maximum overall tokens rate.

pkg/util/limiter/rate_limiter_test.go

+15-55
Original file line numberDiff line numberDiff line change
@@ -45,60 +45,24 @@ func TestRateLimiter_AllowN(t *testing.T) {
4545
now := time.Now()
4646

4747
// Tenant #1
48-
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 8)))
49-
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 10)))
50-
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-1", 3)))
51-
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 2)))
48+
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 8))
49+
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 10))
50+
assert.Equal(t, false, limiter.AllowN(now, "tenant-1", 3))
51+
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 2))
5252

53-
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 8)))
54-
assert.Equal(t, false, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 3)))
55-
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 2)))
53+
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 8))
54+
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-1", 3))
55+
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 2))
5656

5757
// Tenant #2
58-
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 18)))
59-
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 20)))
60-
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-2", 3)))
61-
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 2)))
62-
63-
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 18)))
64-
assert.Equal(t, false, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 3)))
65-
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 2)))
66-
}
67-
68-
func TestRateLimiter_AllowNCancelation(t *testing.T) {
69-
strategy := &staticLimitStrategy{tenants: map[string]struct {
70-
limit float64
71-
burst int
72-
}{
73-
"tenant-1": {limit: 10, burst: 20},
74-
}}
75-
76-
limiter := NewRateLimiter(strategy, 10*time.Second)
77-
now := time.Now()
78-
79-
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 12)))
80-
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-1", 9)))
81-
82-
ok1, r1 := limiter.AllowN(now, "tenant-1", 8)
83-
assert.Equal(t, true, ok1)
84-
r1.CancelAt(now)
85-
86-
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 8)))
87-
88-
// +10 tokens (1s)
89-
nowPlus := now.Add(time.Second)
90-
91-
assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 6)))
92-
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 5)))
93-
94-
ok2, r2 := limiter.AllowN(nowPlus, "tenant-1", 4)
95-
assert.Equal(t, true, ok2)
96-
r2.CancelAt(nowPlus)
97-
98-
assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 2)))
99-
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 3)))
100-
assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 2)))
101-
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 1)))
58+
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 18))
59+
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 20))
60+
assert.Equal(t, false, limiter.AllowN(now, "tenant-2", 3))
61+
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 2))
62+
63+
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 18))
64+
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-2", 3))
65+
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 2))
10266
}
10367

10468
func BenchmarkRateLimiter_CustomMultiTenant(b *testing.B) {
@@ -163,7 +127,3 @@ func (s *staticLimitStrategy) Burst(tenantID string) int {
163127

164128
return tenant.burst
165129
}
166-
167-
func isOK(ok bool, r Reservation) bool {
168-
return ok
169-
}

0 commit comments

Comments
 (0)