Skip to content

Total Worker Parallelism in the Querier #2456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
50fb3c5
Added param
joe-elliott Apr 10, 2020
1efc6d4
First pass new structure
joe-elliott Apr 13, 2020
3cda9ed
Split out manager into its own code
joe-elliott Apr 13, 2020
662e364
created interface to help with testing
joe-elliott Apr 13, 2020
bae5fdf
Cleaned up notes. Addressed concurrency issues
joe-elliott Apr 13, 2020
cecf9b2
Added parallelism reset
joe-elliott Apr 13, 2020
55b0d21
Added total parallelism support
joe-elliott Apr 13, 2020
c0373c5
Mirrored all tests for total parallelism
joe-elliott Apr 13, 2020
b7f1858
Added clarity
joe-elliott Apr 13, 2020
818bb94
Removed upstream interface
joe-elliott Apr 13, 2020
6c5e68b
Added first worker test
joe-elliott Apr 13, 2020
dcf2e45
Added concurrency test
joe-elliott Apr 13, 2020
2e75c3f
TestTable for Concurrency
joe-elliott Apr 13, 2020
9c064cf
Added series of test cases
joe-elliott Apr 13, 2020
203ee30
Added test for failed receive. Fixed graceful quit shutdown
joe-elliott Apr 13, 2020
67119bc
Added tests for number of calls made to the handler
joe-elliott Apr 13, 2020
c2cc86f
Added test for cancelling service context cancelling processes
joe-elliott Apr 13, 2020
9f2b58b
Added changelog entry and updated docs
joe-elliott Apr 13, 2020
992a3e2
lint
joe-elliott Apr 13, 2020
9e504c3
lint part deux
joe-elliott Apr 13, 2020
9593d25
make doc
joe-elliott Apr 14, 2020
0fb4098
Added resetParallelism tests and fixed bug
joe-elliott Apr 14, 2020
248ec80
Added stop test to resetParallelism
joe-elliott Apr 14, 2020
ebd667f
Added test names
joe-elliott Apr 14, 2020
670e157
Cleaned up resetParallelism
joe-elliott Apr 14, 2020
b77ba05
Added DNS Watch tests
joe-elliott Apr 14, 2020
19cbec9
lint
joe-elliott Apr 14, 2020
819b207
Removed racey dns watcher tests
joe-elliott Apr 14, 2020
79ad8fb
Merge branch 'master' into querier-parallelism
joe-elliott Apr 14, 2020
3fcbe98
Merge branch 'master' into querier-parallelism
joe-elliott Apr 14, 2020
0fc3b44
Changed to worker-match-max-concurrent
joe-elliott Apr 16, 2020
4729723
Removed unnecessary param
joe-elliott Apr 16, 2020
692b623
Swapped to client config instead of bare param
joe-elliott Apr 16, 2020
75425bd
Use DialContext()
joe-elliott Apr 16, 2020
b18c376
Added comments/logs around concurrency distribution
joe-elliott Apr 16, 2020
94dbddd
Swapped to force cancel via context
joe-elliott Apr 16, 2020
89efeee
Merge branch 'master' into querier-parallelism
joe-elliott Apr 16, 2020
be78bab
Removed outdated comment
joe-elliott Apr 16, 2020
9910487
Added match_max_concurrent to example single binary configs
joe-elliott Apr 17, 2020
adda6a5
Improved comments
joe-elliott Apr 17, 2020
e868e4f
Added log for scenario where we can't find in a map
joe-elliott Apr 17, 2020
ea97cb8
Removed unnecessary nesting
joe-elliott Apr 17, 2020
d209888
Moved concurrency to its own method
joe-elliott Apr 17, 2020
ec7809e
Removed managerCtx/Cancel
joe-elliott Apr 17, 2020
9eb6218
Fixed expected value on test
joe-elliott Apr 17, 2020
11c94b8
Merge branch 'master' into querier-parallelism
joe-elliott Apr 17, 2020
ac2fc0f
Removed WithBlock() option
joe-elliott Apr 19, 2020
cd54802
Remove deleted address from manager map
joe-elliott Apr 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392
* [ENHANCEMENT] Added `cortex_distributor_latest_seen_sample_timestamp_seconds` metric to see how far behind Prometheus servers are in sending data. #2371
* [ENHANCEMENT] FIFO cache to support eviction based on memory usage. The `-<prefix>.fifocache.size` CLI flag has been renamed to `-<prefix>.fifocache.max-size-items` as well as its YAML config option `size` renamed to `max_size_items`. Added `-<prefix>.fifocache.max-size-bytes` CLI flag and YAML config option `max_size_bytes` to specify memory limit of the cache. #2319
* [ENHANCEMENT] Added `-querier.worker-match-max-concurrent`. Force worker concurrency to match the `-querier.max-concurrent` option. Overrides `-querier.worker-parallelism`. #2456
* [ENHANCEMENT] Added the following metrics for monitoring delete requests: #2445
- `cortex_purger_delete_requests_received_total`: Number of delete requests received per user.
- `cortex_purger_delete_requests_processed_total`: Number of delete requests processed per user.
Expand Down
10 changes: 8 additions & 2 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Duration arguments should be specified with a unit like `5s` or `3h`. Valid time
- `-querier.max-concurrent`

The maximum number of top-level PromQL queries that will execute at the same time, per querier process.
If using the query frontend, this should be set to at least (`querier.worker-parallelism` * number of query frontend replicas). Otherwise queries may queue in the queriers and not the frontend, which will affect QoS.
If using the query frontend, this should be set to at least (`-querier.worker-parallelism` * number of query frontend replicas). Otherwise queries may queue in the queriers and not the frontend, which will affect QoS. Alternatively, consider using `-querier.worker-match-max-concurrent` to force worker parallelism to match `-querier.max-concurrent`.

- `-querier.query-parallelism`

Expand All @@ -42,9 +42,15 @@ The next three options only apply when the querier is used together with the Que

- `-querier.worker-parallelism`

Number of simultaneous queries to process, per worker process.
Number of simultaneous queries to process, per query frontend.
See note on `-querier.max-concurrent`

- `-querier.worker-match-max-concurrent`

Force worker concurrency to match the -querier.max-concurrent option. Overrides `-querier.worker-parallelism`.
See note on `-querier.max-concurrent`


## Querier and Ruler

The ingester query API was improved over time, but defaults to the old behaviour for backwards-compatibility. For best results both of these next two flags should be set to `true`:
Expand Down
7 changes: 6 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1801,10 +1801,15 @@ The `frontend_worker_config` configures the worker - running within the Cortex q
# CLI flag: -querier.frontend-address
[frontend_address: <string> | default = ""]

# Number of simultaneous queries to process.
# Number of simultaneous queries to process per query frontend.
# CLI flag: -querier.worker-parallelism
[parallelism: <int> | default = 10]

# Force worker concurrency to match the -querier.max-concurrent option.
# Overrides querier.worker-parallelism.
# CLI flag: -querier.worker-match-max-concurrent
[match_max_concurrent: <boolean> | default = false]

# How often to query DNS.
# CLI flag: -querier.dns-lookup-period
[dns_lookup_duration: <duration> | default = 10s]
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/single-process-config-blocks-gossip-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,6 @@ tsdb:
backend: filesystem # s3, gcs, azure or filesystem are valid options
filesystem:
dir: /tmp/cortex/storage

frontend_worker:
match_max_concurrent: true
3 changes: 3 additions & 0 deletions docs/configuration/single-process-config-blocks-gossip-2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ tsdb:
backend: filesystem # s3, gcs, azure or filesystem are valid options
filesystem:
dir: /tmp/cortex/storage

frontend_worker:
match_max_concurrent: true
3 changes: 3 additions & 0 deletions docs/configuration/single-process-config-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,6 @@ compactor:
sharding_ring:
kvstore:
store: inmemory

frontend_worker:
match_max_concurrent: true
5 changes: 5 additions & 0 deletions docs/configuration/single-process-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ storage:

filesystem:
directory: /tmp/cortex/chunks

# Configure the frontend worker in the querier to match worker count
# to max_concurrent on the queriers.
frontend_worker:
match_max_concurrent: true
```
3 changes: 3 additions & 0 deletions docs/configuration/single-process-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ storage:
purger:
enable: true
object_store_type: filesystem

frontend_worker:
match_max_concurrent: true
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) {

// Query frontend worker will only be started after all its dependencies are started, not here.
// Worker may also be nil, if not configured, which is OK.
worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(handler), util.Logger)
worker, err := frontend.NewWorker(cfg.Worker, cfg.Querier, httpgrpc_server.NewServer(handler), util.Logger)
if err != nil {
return
}
Expand Down
22 changes: 15 additions & 7 deletions pkg/querier/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/weaveworks/common/user"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand Down Expand Up @@ -55,7 +56,8 @@ func TestFrontend(t *testing.T) {

assert.Equal(t, "Hello World", string(body))
}
testFrontend(t, handler, test)
testFrontend(t, handler, test, false)
testFrontend(t, handler, test, true)
}

func TestFrontendPropagateTrace(t *testing.T) {
Expand Down Expand Up @@ -104,7 +106,8 @@ func TestFrontendPropagateTrace(t *testing.T) {
// Query should do one calls.
assert.Equal(t, traceID, <-observedTraceID)
}
testFrontend(t, handler, test)
testFrontend(t, handler, test, false)
testFrontend(t, handler, test, true)
}

// TestFrontendCancel ensures that when client requests are cancelled,
Expand Down Expand Up @@ -135,7 +138,9 @@ func TestFrontendCancel(t *testing.T) {
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int32(1), atomic.LoadInt32(&tries))
}
testFrontend(t, handler, test)
testFrontend(t, handler, test, false)
tries = 0
testFrontend(t, handler, test, true)
}

func TestFrontendCancelStatusCode(t *testing.T) {
Expand All @@ -156,15 +161,18 @@ func TestFrontendCancelStatusCode(t *testing.T) {
}
}

func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
func testFrontend(t *testing.T, handler http.Handler, test func(addr string), matchMaxConcurrency bool) {
logger := log.NewNopLogger()

var (
config Config
workerConfig WorkerConfig
config Config
workerConfig WorkerConfig
querierConfig querier.Config
)
flagext.DefaultValues(&config, &workerConfig)
workerConfig.Parallelism = 1
workerConfig.MatchMaxConcurrency = matchMaxConcurrency
querierConfig.MaxConcurrent = 1

// localhost:0 prevents firewall warnings on Mac OS X.
grpcListen, err := net.Listen("tcp", "localhost:0")
Expand Down Expand Up @@ -196,7 +204,7 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
go httpServer.Serve(httpListen) //nolint:errcheck
go grpcServer.Serve(grpcListen) //nolint:errcheck

worker, err := NewWorker(workerConfig, httpgrpc_server.NewServer(handler), logger)
worker, err := NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))

Expand Down
Loading