Skip to content

Add option to limit concurrent queries to Cassandra. #2562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [FEATURE] Experimental: Added a new object storage client for OpenStack Swift. #2440
* [FEATURE] Update in dependency `weaveworks/common`. TLS config options added to the Server. #2535
* [FEATURE] Experimental: Added support for `/api/v1/metadata` Prometheus-based endpoint. #2549
* [FEATURE] Add ability to limit concurrent queries to Cassandra with `-cassandra.limit-query-concurrency` flag. #2562
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
* [ENHANCEMENT] Failures on samples at distributors and ingesters return the first validation error as opposed to the last. #2383
* [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,10 @@ cassandra:
# CLI flag: -cassandra.retry-min-backoff
[retry_min_backoff: <duration> | default = 100ms]

# Limit number of concurrent queries to Cassandra. (Default is 0: no limit)
# CLI flag: -cassandra.limit-query-concurrency
[limit_query_concurrency: <int> | default = 0]

boltdb:
# Location of BoltDB index files.
# CLI flag: -boltdb.dir
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
go.uber.org/atomic v1.5.1
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/api v0.14.0
google.golang.org/grpc v1.26.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEha
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
36 changes: 30 additions & 6 deletions pkg/chunk/cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/gocql/gocql"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/util"
Expand Down Expand Up @@ -40,6 +41,7 @@ type Config struct {
Retries int `yaml:"max_retries"`
MaxBackoff time.Duration `yaml:"retry_max_backoff"`
MinBackoff time.Duration `yaml:"retry_min_backoff"`
LimitQueryConcurrency int `yaml:"limit_query_concurrency"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -63,6 +65,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.Retries, "cassandra.max-retries", 0, "Number of retries to perform on a request. (Default is 0: no retries)")
f.DurationVar(&cfg.MinBackoff, "cassandra.retry-min-backoff", 100*time.Millisecond, "Minimum time to wait before retrying a failed request. (Default = 100ms)")
f.DurationVar(&cfg.MaxBackoff, "cassandra.retry-max-backoff", 10*time.Second, "Maximum time to wait before retrying a failed request. (Default = 10s)")
f.IntVar(&cfg.LimitQueryConcurrency, "cassandra.limit-query-concurrency", 0, "Limit number of concurrent queries to Cassandra. (Default is 0: no limit)")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -192,9 +195,10 @@ func (cfg *Config) createKeyspace() error {

// StorageClient implements chunk.IndexClient and chunk.ObjectClient for Cassandra.
type StorageClient struct {
cfg Config
schemaCfg chunk.SchemaConfig
session *gocql.Session
cfg Config
schemaCfg chunk.SchemaConfig
session *gocql.Session
querySemaphore *semaphore.Weighted
}

// NewStorageClient returns a new StorageClient.
Expand All @@ -206,10 +210,16 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient,
return nil, errors.WithStack(err)
}

var querySemaphore *semaphore.Weighted
if cfg.LimitQueryConcurrency > 0 {
querySemaphore = semaphore.NewWeighted(int64(cfg.LimitQueryConcurrency))
}

client := &StorageClient{
cfg: cfg,
schemaCfg: schemaCfg,
session: session,
cfg: cfg,
schemaCfg: schemaCfg,
session: session,
querySemaphore: querySemaphore,
}
return client, nil
}
Expand Down Expand Up @@ -277,6 +287,13 @@ func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQue
}

func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback util.Callback) error {
if s.querySemaphore != nil {
if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer s.querySemaphore.Release(1)
}

var q *gocql.Query

switch {
Expand Down Expand Up @@ -383,6 +400,13 @@ func (s *StorageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c
}

func (s *StorageClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) {
if s.querySemaphore != nil {
if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
return input, err
}
defer s.querySemaphore.Release(1)
}

tableName, err := s.schemaCfg.ChunkTableFor(input.From)
if err != nil {
return input, err
Expand Down
136 changes: 136 additions & 0 deletions vendor/golang.org/x/sync/semaphore/semaphore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion vendor/modules.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.