Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Store Gateway] Token bucket limiter #6016

Merged
merged 28 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2ad976a
Create TokenBucket
justinjung04 Jun 4, 2024
917d2e9
Update bucket stores to pass token bucket
justinjung04 Jun 13, 2024
6700b08
Move limiters to a new file
justinjung04 Jun 13, 2024
5e78511
Added tests for limiters and token bucket
justinjung04 Jun 13, 2024
8ff710e
Add more tests
justinjung04 Jun 14, 2024
6c15c92
Added enable flag
justinjung04 Jun 14, 2024
6ae585a
Add dryrun feature
justinjung04 Jun 14, 2024
bd8b9e7
Add doc
justinjung04 Jun 14, 2024
f66aa50
Add changelog
justinjung04 Jun 14, 2024
8940823
Lint
justinjung04 Jun 14, 2024
7054778
Do not create pod token bucket if the feature is not enabled
justinjung04 Jun 14, 2024
cf39c55
More docs
justinjung04 Jun 14, 2024
c944474
Address comments
justinjung04 Jun 20, 2024
4ed7bcb
Rename podTokenBucket to instanceTokenBucket
justinjung04 Jun 20, 2024
3219a44
Updated default values
justinjung04 Jun 26, 2024
52518a1
Rename TokenBucketLimiter to TokenBucketBytesLimiter
justinjung04 Jun 27, 2024
c5e6968
Changed error to httpgrpc
justinjung04 Jun 27, 2024
d0c4fbd
Nit
justinjung04 Jul 2, 2024
dff09ce
Increment failure metric when token bucket returns error
justinjung04 Jul 2, 2024
8467087
Simplify token bucket by making Retrieve to always deduct token
justinjung04 Jul 11, 2024
54c4e49
Throw 429 and 422 for different failure scenarios
justinjung04 Jul 11, 2024
26dc6f4
Hide token factors from doc
justinjung04 Jul 12, 2024
78a856e
Simplified config by combining dryrun and enabled
justinjung04 Jul 12, 2024
3743da2
Merge branch 'master' into sg-token-bucket
justinjung04 Jul 12, 2024
c7f6191
Remove test log
justinjung04 Jul 12, 2024
faba9b8
Fix tests
justinjung04 Jul 12, 2024
172a62d
Fix
justinjung04 Jul 12, 2024
5cbd557
Merge branch 'master' into sg-token-bucket
justinjung04 Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
* [FEATURE] Store Gateway: Token bucket limiter. #6016
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
Expand Down
18 changes: 18 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,24 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.series-batch-size
[series_batch_size: <int> | default = 10000]

token_bucket_bytes_limiter:
# Token bucket bytes limiter mode. Supported values are: disabled, dryrun,
# enabled
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.mode
[mode: <string> | default = "disabled"]

# Instance token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size
[instance_token_bucket_size: <int> | default = 859832320]

# User token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size
[user_token_bucket_size: <int> | default = 644874240]

# Request token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size
[request_token_bucket_size: <int> | default = 4194304]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
18 changes: 18 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,24 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.series-batch-size
[series_batch_size: <int> | default = 10000]

token_bucket_bytes_limiter:
# Token bucket bytes limiter mode. Supported values are: disabled, dryrun,
# enabled
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.mode
[mode: <string> | default = "disabled"]

# Instance token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size
[instance_token_bucket_size: <int> | default = 859832320]

# User token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size
[user_token_bucket_size: <int> | default = 644874240]

# Request token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size
[request_token_bucket_size: <int> | default = 4194304]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
18 changes: 18 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1899,6 +1899,24 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.series-batch-size
[series_batch_size: <int> | default = 10000]

token_bucket_bytes_limiter:
# Token bucket bytes limiter mode. Supported values are: disabled, dryrun,
# enabled
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.mode
[mode: <string> | default = "disabled"]

# Instance token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size
[instance_token_bucket_size: <int> | default = 859832320]

# User token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size
[user_token_bucket_size: <int> | default = 644874240]

# Request token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size
[request_token_bucket_size: <int> | default = 4194304]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
45 changes: 45 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb

import (
"flag"
"fmt"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -52,6 +53,7 @@ var (

ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled")
ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy")
ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode")
)

// BlocksStorageConfig holds the config information for the blocks storage.
Expand Down Expand Up @@ -292,6 +294,22 @@ type BucketStoreConfig struct {

// Controls how many series to fetch per batch in Store Gateway. Default value is 10000.
SeriesBatchSize int `yaml:"series_batch_size"`

// Token bucket configs
TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"`
}

type TokenBucketBytesLimiterConfig struct {
Mode string `yaml:"mode"`
InstanceTokenBucketSize int64 `yaml:"instance_token_bucket_size"`
UserTokenBucketSize int64 `yaml:"user_token_bucket_size"`
RequestTokenBucketSize int64 `yaml:"request_token_bucket_size"`
FetchedPostingsTokenFactor float64 `yaml:"fetched_postings_token_factor" doc:"hidden"`
TouchedPostingsTokenFactor float64 `yaml:"touched_postings_token_factor" doc:"hidden"`
FetchedSeriesTokenFactor float64 `yaml:"fetched_series_token_factor" doc:"hidden"`
TouchedSeriesTokenFactor float64 `yaml:"touched_series_token_factor" doc:"hidden"`
FetchedChunksTokenFactor float64 `yaml:"fetched_chunks_token_factor" doc:"hidden"`
TouchedChunksTokenFactor float64 `yaml:"touched_chunks_token_factor" doc:"hidden"`
}

// RegisterFlags registers the BucketStore flags
Expand Down Expand Up @@ -325,6 +343,16 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.LazyExpandedPostingsEnabled, "blocks-storage.bucket-store.lazy-expanded-postings-enabled", false, "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.")
f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.")
f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.")
f.StringVar(&cfg.TokenBucketBytesLimiter.Mode, "blocks-storage.bucket-store.token-bucket-bytes-limiter.mode", string(TokenBucketBytesLimiterDisabled), fmt.Sprintf("Token bucket bytes limiter mode. Supported values are: %s", strings.Join(supportedTokenBucketBytesLimiterModes, ", ")))
f.Int64Var(&cfg.TokenBucketBytesLimiter.InstanceTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size", int64(820*units.Mebibyte), "Instance token bucket size")
f.Int64Var(&cfg.TokenBucketBytesLimiter.UserTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size", int64(615*units.Mebibyte), "User token bucket size")
f.Int64Var(&cfg.TokenBucketBytesLimiter.RequestTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size", int64(4*units.Mebibyte), "Request token bucket size")
f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedPostingsTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-postings-token-factor", 0, "Multiplication factor used for fetched postings token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedPostingsTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-postings-token-factor", 5, "Multiplication factor used for touched postings token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedSeriesTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-series-token-factor", 0, "Multiplication factor used for fetched series token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedSeriesTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-series-token-factor", 25, "Multiplication factor used for touched series token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-chunks-token-factor", 0, "Multiplication factor used for fetched chunks token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token")
}

// Validate the config.
Expand All @@ -344,6 +372,9 @@ func (cfg *BucketStoreConfig) Validate() error {
if !util.StringsContain(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) {
return ErrInvalidBucketIndexBlockDiscoveryStrategy
}
if !util.StringsContain(supportedTokenBucketBytesLimiterModes, cfg.TokenBucketBytesLimiter.Mode) {
return ErrInvalidTokenBucketBytesLimiterMode
}
return nil
}

Expand Down Expand Up @@ -375,3 +406,17 @@ var supportedBlockDiscoveryStrategies = []string{
string(RecursiveDiscovery),
string(BucketIndexDiscovery),
}

type TokenBucketBytesLimiterMode string

const (
TokenBucketBytesLimiterDisabled TokenBucketBytesLimiterMode = "disabled"
TokenBucketBytesLimiterDryRun TokenBucketBytesLimiterMode = "dryrun"
TokenBucketBytesLimiterEnabled TokenBucketBytesLimiterMode = "enabled"
)

var supportedTokenBucketBytesLimiterModes = []string{
string(TokenBucketBytesLimiterDisabled),
string(TokenBucketBytesLimiterDryRun),
string(TokenBucketBytesLimiterEnabled),
}
101 changes: 52 additions & 49 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -35,6 +34,7 @@ import (

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/backoff"
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
util_log "github.com/cortexproject/cortex/pkg/util/log"
Expand Down Expand Up @@ -73,6 +73,11 @@ type BucketStores struct {
storesErrorsMu sync.RWMutex
storesErrors map[string]error

instanceTokenBucket *util.TokenBucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The bucket_stores.go can have very minimal changes if all the code can be moved to the limiter.go with another layer of abstraction.


userTokenBucketsMu sync.RWMutex
userTokenBuckets map[string]*util.TokenBucket

// Keeps number of inflight requests
inflightRequestCnt int
inflightRequestMu sync.RWMutex
Expand Down Expand Up @@ -115,6 +120,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
userTokenBuckets: make(map[string]*util.TokenBucket),
syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_bucket_stores_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Expand Down Expand Up @@ -144,6 +150,13 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
return nil, errors.Wrap(err, "create chunks bytes pool")
}

if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) {
u.instanceTokenBucket = util.NewTokenBucket(cfg.BucketStore.TokenBucketBytesLimiter.InstanceTokenBucketSize, promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_bucket_stores_instance_token_bucket_remaining",
Help: "Number of tokens left in instance token bucket.",
}))
}

if reg != nil {
reg.MustRegister(u.bucketStoreMetrics, u.metaFetcherMetrics)
}
Expand Down Expand Up @@ -475,6 +488,12 @@ func (u *BucketStores) closeEmptyBucketStore(userID string) error {
unlockInDefer = false
u.storesMu.Unlock()

if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) {
u.userTokenBucketsMu.Lock()
delete(u.userTokenBuckets, userID)
u.userTokenBucketsMu.Unlock()
}

u.metaFetcherMetrics.RemoveUserRegistry(userID)
u.bucketStoreMetrics.RemoveUserRegistry(userID)
return bs.Close()
Expand Down Expand Up @@ -612,13 +631,19 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
}

if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) {
u.userTokenBucketsMu.Lock()
u.userTokenBuckets[userID] = util.NewTokenBucket(u.cfg.BucketStore.TokenBucketBytesLimiter.UserTokenBucketSize, nil)
u.userTokenBucketsMu.Unlock()
}

bs, err := store.NewBucketStore(
userBkt,
fetcher,
u.syncDirForUser(userID),
newChunksLimiterFactory(u.limits, userID),
newSeriesLimiterFactory(u.limits, userID),
newBytesLimiterFactory(u.limits, userID),
newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve),
u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
Expand Down Expand Up @@ -680,6 +705,31 @@ func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[str
}
}

func (u *BucketStores) getUserTokenBucket(userID string) *util.TokenBucket {
u.userTokenBucketsMu.RLock()
defer u.userTokenBucketsMu.RUnlock()
return u.userTokenBuckets[userID]
}

func (u *BucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 {
tokensToRetrieve := float64(tokens)
switch dataType {
case store.PostingsFetched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedPostingsTokenFactor
case store.PostingsTouched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedPostingsTokenFactor
case store.SeriesFetched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedSeriesTokenFactor
case store.SeriesTouched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedSeriesTokenFactor
case store.ChunksFetched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedChunksTokenFactor
case store.ChunksTouched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedChunksTokenFactor
}
return int64(tokensToRetrieve)
}

func getUserIDFromGRPCContext(ctx context.Context) string {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
Expand Down Expand Up @@ -730,50 +780,3 @@ type spanSeriesServer struct {
func (s spanSeriesServer) Context() context.Context {
return s.ctx
}

type limiter struct {
limiter *store.Limiter
}

func (c *limiter) Reserve(num uint64) error {
return c.ReserveWithType(num, 0)
}

func (c *limiter) ReserveWithType(num uint64, _ store.StoreDataType) error {
err := c.limiter.Reserve(num)
if err != nil {
return httpgrpc.Errorf(http.StatusUnprocessableEntity, err.Error())
}

return nil
}

func newChunksLimiterFactory(limits *validation.Overrides, userID string) store.ChunksLimiterFactory {
return func(failedCounter prometheus.Counter) store.ChunksLimiter {
// Since limit overrides could be live reloaded, we have to get the current user's limit
// each time a new limiter is instantiated.
return &limiter{
limiter: store.NewLimiter(uint64(limits.MaxChunksPerQueryFromStore(userID)), failedCounter),
}
}
}

func newSeriesLimiterFactory(limits *validation.Overrides, userID string) store.SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) store.SeriesLimiter {
// Since limit overrides could be live reloaded, we have to get the current user's limit
// each time a new limiter is instantiated.
return &limiter{
limiter: store.NewLimiter(uint64(limits.MaxFetchedSeriesPerQuery(userID)), failedCounter),
}
}
}

func newBytesLimiterFactory(limits *validation.Overrides, userID string) store.BytesLimiterFactory {
return func(failedCounter prometheus.Counter) store.BytesLimiter {
// Since limit overrides could be live reloaded, we have to get the current user's limit
// each time a new limiter is instantiated.
return &limiter{
limiter: store.NewLimiter(uint64(limits.MaxDownloadedBytesPerRequest(userID)), failedCounter),
}
}
}
Loading
Loading