Skip to content

Commit 6ae585a

Browse files
committed
Add dryrun feature
Signed-off-by: Justin Jung <[email protected]>
1 parent 6c15c92 commit 6ae585a

File tree

4 files changed

+83
-14
lines changed

4 files changed

+83
-14
lines changed

pkg/storegateway/limiter.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66

77
"github.com/cortexproject/cortex/pkg/storage/tsdb"
88
"github.com/cortexproject/cortex/pkg/util"
9+
util_log "github.com/cortexproject/cortex/pkg/util/log"
910
"github.com/cortexproject/cortex/pkg/util/validation"
11+
"github.com/go-kit/log/level"
1012
"github.com/prometheus/client_golang/prometheus"
1113
"github.com/thanos-io/thanos/pkg/store"
1214
"github.com/weaveworks/common/httpgrpc"
@@ -65,18 +67,36 @@ func (t *tokenBucketLimiter) ReserveWithType(num uint64, dataType store.StoreDat
6567
return nil
6668
}
6769

68-
// if request bucket is running low, check shared buckets
70+
// if we can't retrieve from request bucket, check shared buckets
6971
retrieved = t.userTokenBucket.Retrieve(tokensToRetrieve)
7072
if !retrieved {
73+
// if dry run, force retrieve all tokens and return nil
74+
if t.dryRun {
75+
t.requestTokenBucket.ForceRetrieve(tokensToRetrieve)
76+
t.userTokenBucket.ForceRetrieve(tokensToRetrieve)
77+
t.podTokenBucket.ForceRetrieve(tokensToRetrieve)
78+
level.Warn(util_log.Logger).Log("msg", "not enough tokens in user token bucket", "dataType", dataType, "dataSize", num, "tokens", tokensToRetrieve)
79+
return nil
80+
}
7181
return fmt.Errorf("not enough tokens in user token bucket")
7282
}
7383

7484
retrieved = t.podTokenBucket.Retrieve(tokensToRetrieve)
7585
if !retrieved {
7686
t.userTokenBucket.Refund(tokensToRetrieve)
87+
88+
// if dry run, force retrieve all tokens and return nil
89+
if t.dryRun {
90+
// user bucket is already retrieved
91+
t.requestTokenBucket.ForceRetrieve(tokensToRetrieve)
92+
t.podTokenBucket.ForceRetrieve(tokensToRetrieve)
93+
level.Warn(util_log.Logger).Log("msg", "not enough tokens in pod token bucket", "dataType", dataType, "dataSize", num, "tokens", tokensToRetrieve)
94+
return nil
95+
}
7796
return fmt.Errorf("not enough tokens in pod token bucket")
7897
}
7998

99+
t.requestTokenBucket.ForceRetrieve(tokensToRetrieve)
80100
return nil
81101
}
82102

pkg/storegateway/limiter_test.go

+47-11
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestNewTokenBucketLimiter(t *testing.T) {
3636
podTokenBucket := util.NewTokenBucket(3, 3, nil)
3737
userTokenBucket := util.NewTokenBucket(2, 2, nil)
3838
requestTokenBucket := util.NewTokenBucket(1, 1, nil)
39-
l := newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket, true, func(tokens uint64, dataType store.StoreDataType) int64 {
39+
l := newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket, false, func(tokens uint64, dataType store.StoreDataType) int64 {
4040
if dataType == store.SeriesFetched {
4141
return int64(tokens) * 5
4242
}
@@ -45,30 +45,66 @@ func TestNewTokenBucketLimiter(t *testing.T) {
4545

4646
// should force retrieve tokens from all buckets upon succeeding
4747
assert.NoError(t, l.ReserveWithType(2, store.PostingsFetched))
48-
assert.False(t, podTokenBucket.Retrieve(2))
48+
assert.Equal(t, int64(1), podTokenBucket.RemainingTokens())
49+
assert.Equal(t, int64(0), userTokenBucket.RemainingTokens())
50+
assert.Equal(t, int64(-1), requestTokenBucket.RemainingTokens())
4951

5052
// should fail if user token bucket is running low
51-
podTokenBucket.Refund(3)
53+
podTokenBucket.Refund(2)
5254
userTokenBucket.Refund(2)
53-
requestTokenBucket.Refund(1)
55+
requestTokenBucket.Refund(2)
5456
assert.ErrorContains(t, l.ReserveWithType(3, store.PostingsFetched), "not enough tokens in user token bucket")
57+
assert.Equal(t, int64(3), podTokenBucket.RemainingTokens())
58+
assert.Equal(t, int64(2), userTokenBucket.RemainingTokens())
59+
assert.Equal(t, int64(1), requestTokenBucket.RemainingTokens())
5560

5661
// should fail if pod token bucket is running low
57-
podTokenBucket.Refund(3)
58-
userTokenBucket.Refund(2)
59-
requestTokenBucket.Refund(1)
60-
podTokenBucket.ForceRetrieve(3)
62+
podTokenBucket.ForceRetrieve(2)
6163
assert.ErrorContains(t, l.ReserveWithType(2, store.PostingsFetched), "not enough tokens in pod token bucket")
64+
assert.Equal(t, int64(1), podTokenBucket.RemainingTokens())
65+
assert.Equal(t, int64(2), userTokenBucket.RemainingTokens())
66+
assert.Equal(t, int64(1), requestTokenBucket.RemainingTokens())
6267

6368
// should retrieve different amount of tokens based on data type
64-
podTokenBucket.Refund(3)
65-
userTokenBucket.Refund(2)
66-
requestTokenBucket.Refund(1)
69+
podTokenBucket.Refund(2)
6770
assert.ErrorContains(t, l.ReserveWithType(1, store.SeriesFetched), "not enough tokens in user token bucket")
71+
assert.Equal(t, int64(3), podTokenBucket.RemainingTokens())
72+
assert.Equal(t, int64(2), userTokenBucket.RemainingTokens())
73+
assert.Equal(t, int64(1), requestTokenBucket.RemainingTokens())
6874

6975
// should always succeed if retrieve token bucket has enough tokens, although shared buckets are empty
7076
podTokenBucket.ForceRetrieve(3)
7177
userTokenBucket.ForceRetrieve(2)
7278
assert.NoError(t, l.ReserveWithType(1, store.PostingsFetched))
79+
assert.Equal(t, int64(-1), podTokenBucket.RemainingTokens())
80+
assert.Equal(t, int64(-1), userTokenBucket.RemainingTokens())
81+
assert.Equal(t, int64(0), requestTokenBucket.RemainingTokens())
7382
}
7483

84+
func TestNewTokenBucketLimter_DryRun(t *testing.T) {
85+
podTokenBucket := util.NewTokenBucket(3, 3, nil)
86+
userTokenBucket := util.NewTokenBucket(2, 2, nil)
87+
requestTokenBucket := util.NewTokenBucket(1, 1, nil)
88+
l := newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket, true, func(tokens uint64, dataType store.StoreDataType) int64 {
89+
if dataType == store.SeriesFetched {
90+
return int64(tokens) * 5
91+
}
92+
return int64(tokens)
93+
})
94+
95+
// should force retrieve tokens from all buckets upon succeeding
96+
assert.NoError(t, l.ReserveWithType(2, store.PostingsFetched))
97+
assert.False(t, podTokenBucket.Retrieve(2))
98+
assert.Equal(t, int64(1), podTokenBucket.RemainingTokens())
99+
assert.Equal(t, int64(0), userTokenBucket.RemainingTokens())
100+
assert.Equal(t, int64(-1), requestTokenBucket.RemainingTokens())
101+
102+
// should not fail even if tokens are not enough
103+
podTokenBucket.Refund(2)
104+
userTokenBucket.Refund(2)
105+
requestTokenBucket.Refund(2)
106+
assert.NoError(t, l.ReserveWithType(5, store.PostingsFetched))
107+
assert.Equal(t, int64(-2), podTokenBucket.RemainingTokens())
108+
assert.Equal(t, int64(-3), userTokenBucket.RemainingTokens())
109+
assert.Equal(t, int64(-4), requestTokenBucket.RemainingTokens())
110+
}

pkg/util/token_bucket.go

+9
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ func (t *TokenBucket) Refund(amount int64) {
7474
}
7575
}
7676

77+
func (t *TokenBucket) RemainingTokens() int64 {
78+
t.mu.Lock()
79+
defer t.mu.Unlock()
80+
81+
t.updateTokens()
82+
83+
return t.remainingTokens
84+
}
85+
7786
func (t *TokenBucket) updateTokens() {
7887
now := time.Now()
7988
refilledTokens := int64(now.Sub(t.lastRefill).Seconds() * float64(t.refillRate))

pkg/util/token_bucket_test.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,25 @@ func TestTokenBucket_Retrieve(t *testing.T) {
1414
assert.False(t, bucket.Retrieve(10))
1515
time.Sleep(time.Second)
1616
assert.True(t, bucket.Retrieve(10))
17+
assert.Equal(t, int64(0), bucket.RemainingTokens())
1718
}
1819

1920
func TestTokenBucket_ForceRetrieve(t *testing.T) {
2021
bucket := NewTokenBucket(10, 600, nil)
2122

2223
bucket.ForceRetrieve(20)
24+
assert.Equal(t, int64(-10), bucket.RemainingTokens())
2325
assert.False(t, bucket.Retrieve(10))
2426
time.Sleep(time.Second)
2527
assert.True(t, bucket.Retrieve(10))
28+
assert.Equal(t, int64(0), bucket.RemainingTokens())
2629
}
2730

2831
func TestTokenBucket_Refund(t *testing.T) {
2932
bucket := NewTokenBucket(10, 600, nil)
3033

31-
bucket.ForceRetrieve(100)
32-
bucket.Refund(100)
34+
bucket.ForceRetrieve(10)
35+
bucket.Refund(20)
3336
assert.True(t, bucket.Retrieve(10))
37+
assert.Equal(t, int64(0), bucket.RemainingTokens())
3438
}

0 commit comments

Comments
 (0)