Skip to content
This repository was archived by the owner on Jan 12, 2023. It is now read-only.

Commit 5bf3618

Browse files
tomwilkiegouthamve
authored andcommitted
Add option to limit concurrent queries to Cassandra. (cortexproject#2562)
* Add option to limit concurrent queries to Cassandra. Signed-off-by: Tom Wilkie <[email protected]> * go mod vendor Signed-off-by: Tom Wilkie <[email protected]> * Add changelog entry. Signed-off-by: Tom Wilkie <[email protected]> * Review feedback. Signed-off-by: Tom Wilkie <[email protected]>
1 parent 9da7483 commit 5bf3618

File tree

6 files changed

+175
-7
lines changed

6 files changed

+175
-7
lines changed

docs/configuration/config-file-reference.md

+4
Original file line numberDiff line numberDiff line change
@@ -1622,6 +1622,10 @@ cassandra:
16221622
# CLI flag: -cassandra.retry-min-backoff
16231623
[retry_min_backoff: <duration> | default = 100ms]
16241624

1625+
# Limit number of concurrent queries to Cassandra. (Default is 0: no limit)
1626+
# CLI flag: -cassandra.query-concurrency
1627+
[query_concurrency: <int> | default = 0]
1628+
16251629
boltdb:
16261630
# Location of BoltDB index files.
16271631
# CLI flag: -boltdb.dir

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ require (
5555
go.etcd.io/etcd v0.0.0-20190709142735-eb7dd97135a5
5656
go.uber.org/atomic v1.5.0
5757
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
58+
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
5859
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
5960
google.golang.org/api v0.14.0
6061
google.golang.org/grpc v1.25.1

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEha
918918
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
919919
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
920920
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
921+
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
922+
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
921923
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
922924
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
923925
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

pkg/chunk/cassandra/storage_client.go

+30-6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/gocql/gocql"
1313
"github.com/pkg/errors"
14+
"golang.org/x/sync/semaphore"
1415

1516
"github.com/cortexproject/cortex/pkg/chunk"
1617
"github.com/cortexproject/cortex/pkg/chunk/util"
@@ -39,6 +40,7 @@ type Config struct {
3940
Retries int `yaml:"max_retries"`
4041
MaxBackoff time.Duration `yaml:"retry_max_backoff"`
4142
MinBackoff time.Duration `yaml:"retry_min_backoff"`
43+
QueryConcurrency int `yaml:"query_concurrency"`
4244
}
4345

4446
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -62,6 +64,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6264
f.IntVar(&cfg.Retries, "cassandra.max-retries", 0, "Number of retries to perform on a request. (Default is 0: no retries)")
6365
f.DurationVar(&cfg.MinBackoff, "cassandra.retry-min-backoff", 100*time.Millisecond, "Minimum time to wait before retrying a failed request. (Default = 100ms)")
6466
f.DurationVar(&cfg.MaxBackoff, "cassandra.retry-max-backoff", 10*time.Second, "Maximum time to wait before retrying a failed request. (Default = 10s)")
67+
f.IntVar(&cfg.QueryConcurrency, "cassandra.query-concurrency", 0, "Limit number of concurrent queries to Cassandra. (Default is 0: no limit)")
6568
}
6669

6770
func (cfg *Config) Validate() error {
@@ -179,9 +182,10 @@ func (cfg *Config) createKeyspace() error {
179182

180183
// StorageClient implements chunk.IndexClient and chunk.ObjectClient for Cassandra.
181184
type StorageClient struct {
182-
cfg Config
183-
schemaCfg chunk.SchemaConfig
184-
session *gocql.Session
185+
cfg Config
186+
schemaCfg chunk.SchemaConfig
187+
session *gocql.Session
188+
querySemaphore *semaphore.Weighted
185189
}
186190

187191
// NewStorageClient returns a new StorageClient.
@@ -193,10 +197,16 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient,
193197
return nil, errors.WithStack(err)
194198
}
195199

200+
var querySemaphore *semaphore.Weighted
201+
if cfg.QueryConcurrency > 0 {
202+
querySemaphore = semaphore.NewWeighted(int64(cfg.QueryConcurrency))
203+
}
204+
196205
client := &StorageClient{
197-
cfg: cfg,
198-
schemaCfg: schemaCfg,
199-
session: session,
206+
cfg: cfg,
207+
schemaCfg: schemaCfg,
208+
session: session,
209+
querySemaphore: querySemaphore,
200210
}
201211
return client, nil
202212
}
@@ -252,6 +262,13 @@ func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQue
252262
}
253263

254264
func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
265+
if s.querySemaphore != nil {
266+
if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
267+
return err
268+
}
269+
defer s.querySemaphore.Release(1)
270+
}
271+
255272
var q *gocql.Query
256273

257274
switch {
@@ -358,6 +375,13 @@ func (s *StorageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c
358375
}
359376

360377
func (s *StorageClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) {
378+
if s.querySemaphore != nil {
379+
if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
380+
return input, err
381+
}
382+
defer s.querySemaphore.Release(1)
383+
}
384+
361385
tableName, err := s.schemaCfg.ChunkTableFor(input.From)
362386
if err != nil {
363387
return input, err

vendor/golang.org/x/sync/semaphore/semaphore.go

+136
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/modules.txt

+2-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)