Skip to content

Commit 7040830

Browse files
simonswinestevesgMichelHollandsjtlisi
committed
Add concurrency to the mergeQueryable (cortexproject#4065)
* Add concurrency to the mergeQueryable This should significantly parallelize tenant federation queries. This also extends aligns the error and warning wrapping and adds tests for those cases. Signed-off-by: Christian Simon <[email protected]> * Apply suggestions from code review Co-authored-by: Steve Simpson <[email protected]> Co-authored-by: Michel Hollands <[email protected]> Signed-off-by: Christian Simon <[email protected]> * Simplify concurrency of mergeDistinctStringSlice * Avoid background goroutine receiving results * Propagate errGroup context * Improve documentation comments Signed-off-by: Christian Simon <[email protected]> * Add changelog entry Signed-off-by: Christian Simon <[email protected]> * Limit maximum concurrency of mergeQueryable In the first instance the limit is hard-coded, but that might be an option that can configured. Signed-off-by: Christian Simon <[email protected]> * End job early if context already closed Signed-off-by: Christian Simon <[email protected]> * Handle context cancellation in concurrency.ForEach This is beneficial if the methods are not handling context cancellation themselves. Signed-off-by: Christian Simon <[email protected]> * Avoid using interface pointer Signed-off-by: Christian Simon <[email protected]> * Handle errors properly for storage.SeriesSet Signed-off-by: Christian Simon <[email protected]> * Improve warning and error message readability Signed-off-by: Christian Simon <[email protected]> * Simplify mockTenantQueryableWithFilter Reuse queryable struct and avoid using the context for passing parameters around. Signed-off-by: Christian Simon <[email protected]> * Improve wording on comment Co-authored-by: Jacob Lisi <[email protected]> Signed-off-by: Christian Simon <[email protected]> Co-authored-by: Steve Simpson <[email protected]> Co-authored-by: Michel Hollands <[email protected]> Co-authored-by: Jacob Lisi <[email protected]>
1 parent 66433f1 commit 7040830

File tree

5 files changed

+332
-63
lines changed

5 files changed

+332
-63
lines changed

CHANGELOG.md

+19-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@
1111
* `-alertmanager.cluster.advertise-address` instead of `-cluster.advertise-address`
1212
* `-alertmanager.cluster.peers` instead of `-cluster.peer`
1313
* `-alertmanager.cluster.peer-timeout` instead of `-cluster.peer-timeout`
14+
* [CHANGE] Blocks storage: removed the config option `-blocks-storage.bucket-store.index-cache.postings-compression-enabled`, which was deprecated in Cortex 1.6. Postings compression is always enabled. #4101
15+
* [CHANGE] Querier: removed the config option `-store.max-look-back-period`, which was deprecated in Cortex 1.6 and was used only by the chunks storage. You should use `-querier.max-query-lookback` instead. #4101
16+
* [CHANGE] Query Frontend: removed the config option `-querier.compress-http-responses`, which was deprecated in Cortex 1.6. You should use`-api.response-compression-enabled` instead. #4101
17+
* [CHANGE] Runtime-config / overrides: removed the config options `-limits.per-user-override-config` (use `-runtime-config.file`) and `-limits.per-user-override-period` (use `-runtime-config.reload-period`), both deprecated since Cortex 0.6.0. #4112
18+
* [FEATURE] The following features have been marked as stable: #4101
19+
- Shuffle-sharding
20+
- Querier support for querying chunks and blocks store at the same time
21+
- Tracking of active series and exporting them as metrics (`-ingester.active-series-metrics-enabled` and related flags)
22+
- Blocks storage: lazy mmap of block indexes in the store-gateway (`-blocks-storage.bucket-store.index-header-lazy-loading-enabled`)
23+
- Ingester: close idle TSDB and remove them from local disk (`-blocks-storage.tsdb.close-idle-tsdb-timeout`)
1424
* [FEATURE] Memberlist: add TLS configuration options for the memberlist transport layer used by the gossip KV store. #4046
1525
* New flags added for memberlist communication:
1626
* `-memberlist.tls-enabled`
@@ -27,6 +37,8 @@
2737
* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916
2838
* `cortex_ruler_clients`
2939
* `cortex_ruler_client_request_duration_seconds`
40+
* [ENHANCEMENT] Alertmanager: Add API endpoint to list all tenant alertmanager configs: `GET /multitenant_alertmanager/configs`. #3529
41+
* [ENHANCEMENT] Ruler: Add API endpoint to list all tenant ruler rule groups: `GET /ruler/rule_groups`. #3529
3042
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
3143
* [ENHANCEMENT] Query-frontend: reduced memory allocations when serializing query response. #3964
3244
* [ENHANCEMENT] Querier / ruler: some optimizations to PromQL query engine. #3934 #3989
@@ -36,13 +48,17 @@
3648
* [ENHANCEMENT] Allow use of `y|w|d` suffixes for duration related limits and per-tenant limits. #4044
3749
* [ENHANCEMENT] Query-frontend: Small optimization on top of PR #3968 to avoid unnecessary Extents merging. #4026
3850
* [ENHANCEMENT] Add a metric `cortex_compactor_compaction_interval_seconds` for the compaction interval config value. #4040
39-
* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
51+
* [ENHANCEMENT] Ingester: added following per-ingester (instance) experimental limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
4052
* [ENHANCEMENT] Cortex is now built with Go 1.16. #4062
41-
* [ENHANCEMENT] Distributor: added per-distributor limits: max number of inflight requests (`-distributor.instance-limits.max-inflight-push-requests`) and max ingestion rate in samples/sec (`-distributor.instance-limits.max-ingestion-rate`). If not set, these two are unlimited. Also added metrics to expose current values (`cortex_distributor_inflight_push_requests`, `cortex_distributor_ingestion_rate_samples_per_second`) as well as limits (`cortex_distributor_instance_limits` with various `limit` label values). #4071
53+
* [ENHANCEMENT] Distributor: added per-distributor experimental limits: max number of inflight requests (`-distributor.instance-limits.max-inflight-push-requests`) and max ingestion rate in samples/sec (`-distributor.instance-limits.max-ingestion-rate`). If not set, these two are unlimited. Also added metrics to expose current values (`cortex_distributor_inflight_push_requests`, `cortex_distributor_ingestion_rate_samples_per_second`) as well as limits (`cortex_distributor_instance_limits` with various `limit` label values). #4071
4254
* [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074
4355
* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `tenant` to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `tenant` parameter. If no `tenant` is specified, all tenants are flushed, as before. #4073
4456
* [ENHANCEMENT] Alertmanager: validate configured `-alertmanager.web.external-url` and fail if ends with `/`. #4081
57+
* [ENHANCEMENT] Alertmanager: added `-alertmanager.receivers-firewall.block.cidr-networks` and `-alertmanager.receivers-firewall.block.private-addresses` to block specific network addresses in HTTP-based Alertmanager receiver integrations. #4085
4558
* [ENHANCEMENT] Allow configuration of Cassandra's host selection policy. #4069
59+
* [ENHANCEMENT] Store-gateway: retry synching blocks if a per-tenant sync fails. #3975 #4088
60+
* [ENHANCEMENT] Add metric `cortex_tcp_connections` exposing the current number of accepted TCP connections. #4099
61+
* [ENHANCEMENT] Querier: Allow federated queries to run concurrently. #4065
4662
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
4763
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
4864
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
@@ -55,6 +71,7 @@
5571
* [BUGFIX] Ruler: Rule group limit enforcement should now allow the same number of rules in a group as the limit. #3615
5672
* [BUGFIX] Frontend, Query-scheduler: allow querier to notify about shutdown without providing any authentication. #4066
5773
* [BUGFIX] Querier: fixed race condition causing queries to fail right after querier startup with the "empty ring" error. #4068
74+
* [BUGFIX] Compactor: Increment `cortex_compactor_runs_failed_total` if compactor failed compact a single tenant. #4094
5875

5976
## Blocksconvert
6077

pkg/querier/tenantfederation/merge_queryable.go

+114-26
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"sort"
7+
"strings"
78

89
"github.com/pkg/errors"
910
"github.com/prometheus/prometheus/pkg/labels"
@@ -13,12 +14,14 @@ import (
1314
"github.com/weaveworks/common/user"
1415

1516
"github.com/cortexproject/cortex/pkg/tenant"
17+
"github.com/cortexproject/cortex/pkg/util/concurrency"
1618
)
1719

1820
const (
1921
defaultTenantLabel = "__tenant_id__"
2022
retainExistingPrefix = "original_"
2123
originalDefaultTenantLabel = retainExistingPrefix + defaultTenantLabel
24+
maxConcurrency = 16
2225
)
2326

2427
// NewQueryable returns a queryable that iterates through all the tenant IDs
@@ -65,6 +68,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
6568
}
6669

6770
return &mergeQuerier{
71+
ctx: ctx,
6872
queriers: queriers,
6973
tenantIDs: tenantIDs,
7074
}, nil
@@ -77,6 +81,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
7781
// overwritten by the tenant ID and the previous value is exposed through a new
7882
// label prefixed with "original_". This behaviour is not implemented recursively
7983
type mergeQuerier struct {
84+
ctx context.Context
8085
queriers []storage.Querier
8186
tenantIDs []string
8287
}
@@ -97,7 +102,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
97102
name = defaultTenantLabel
98103
}
99104

100-
return m.mergeDistinctStringSlice(func(q storage.Querier) ([]string, storage.Warnings, error) {
105+
return m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) {
101106
return q.LabelValues(name, matchers...)
102107
})
103108
}
@@ -106,7 +111,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
106111
// queriers. It also adds the defaultTenantLabel and if present in the original
107112
// results the originalDefaultTenantLabel
108113
func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) {
109-
labelNames, warnings, err := m.mergeDistinctStringSlice(func(q storage.Querier) ([]string, storage.Warnings, error) {
114+
labelNames, warnings, err := m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) {
110115
return q.LabelNames()
111116
})
112117
if err != nil {
@@ -137,27 +142,64 @@ func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) {
137142
return labelNames, warnings, nil
138143
}
139144

140-
type stringSliceFunc func(storage.Querier) ([]string, storage.Warnings, error)
145+
type stringSliceFunc func(context.Context, storage.Querier) ([]string, storage.Warnings, error)
146+
147+
type stringSliceFuncJob struct {
148+
querier storage.Querier
149+
tenantID string
150+
result []string
151+
warnings storage.Warnings
152+
}
141153

142154
// mergeDistinctStringSlice is aggregating results from stringSliceFunc calls
143-
// on a querier. It removes duplicates and sorts the result. It doesn't require
144-
// the output of the stringSliceFunc to be sorted, as results of LabelValues
145-
// are not sorted.
146-
//
147-
// TODO: Consider running stringSliceFunc calls concurrently
155+
// on per querier in parallel. It removes duplicates and sorts the result. It
156+
// doesn't require the output of the stringSliceFunc to be sorted, as results
157+
// of LabelValues are not sorted.
148158
func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, storage.Warnings, error) {
159+
var jobs = make([]interface{}, len(m.tenantIDs))
160+
161+
for pos := range m.tenantIDs {
162+
jobs[pos] = &stringSliceFuncJob{
163+
querier: m.queriers[pos],
164+
tenantID: m.tenantIDs[pos],
165+
}
166+
}
167+
168+
run := func(ctx context.Context, jobIntf interface{}) error {
169+
job, ok := jobIntf.(*stringSliceFuncJob)
170+
if !ok {
171+
return fmt.Errorf("unexpected type %T", jobIntf)
172+
}
173+
174+
var err error
175+
job.result, job.warnings, err = f(ctx, job.querier)
176+
if err != nil {
177+
return errors.Wrapf(err, "error querying %s %s", rewriteLabelName(defaultTenantLabel), job.tenantID)
178+
}
179+
180+
return nil
181+
}
182+
183+
err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
184+
if err != nil {
185+
return nil, nil, err
186+
}
187+
188+
// aggregate warnings and deduplicate string results
149189
var warnings storage.Warnings
150190
resultMap := make(map[string]struct{})
151-
for pos, tenantID := range m.tenantIDs {
152-
result, resultWarnings, err := f(m.queriers[pos])
153-
if err != nil {
154-
return nil, nil, err
191+
for _, jobIntf := range jobs {
192+
job, ok := jobIntf.(*stringSliceFuncJob)
193+
if !ok {
194+
return nil, nil, fmt.Errorf("unexpected type %T", jobIntf)
155195
}
156-
for _, e := range result {
196+
197+
for _, e := range job.result {
157198
resultMap[e] = struct{}{}
158199
}
159-
for _, w := range resultWarnings {
160-
warnings = append(warnings, fmt.Errorf("error querying tenant id %s: %w", tenantID, w))
200+
201+
for _, w := range job.warnings {
202+
warnings = append(warnings, errors.Wrapf(w, "warning querying %s %s", rewriteLabelName(defaultTenantLabel), job.tenantID))
161203
}
162204
}
163205

@@ -173,33 +215,60 @@ func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, st
173215
func (m *mergeQuerier) Close() error {
174216
errs := tsdb_errors.NewMulti()
175217
for pos, tenantID := range m.tenantIDs {
176-
errs.Add(errors.Wrapf(m.queriers[pos].Close(), "failed to close querier for tenant id %s", tenantID))
218+
errs.Add(errors.Wrapf(m.queriers[pos].Close(), "failed to close querier for %s %s", rewriteLabelName(defaultTenantLabel), tenantID))
177219
}
178220
return errs.Err()
179221
}
180222

223+
type selectJob struct {
224+
pos int
225+
querier storage.Querier
226+
tenantID string
227+
}
228+
181229
// Select returns a set of series that matches the given label matchers. If the
182230
// tenantLabelName is matched on it only considers those queriers matching. The
183231
// forwarded labelSelector is not containing those that operate on
184232
// tenantLabelName.
185233
func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
186234
matchedTenants, filteredMatchers := filterValuesByMatchers(defaultTenantLabel, m.tenantIDs, matchers...)
187-
var seriesSets = make([]storage.SeriesSet, 0, len(matchedTenants))
188-
for pos, tenantID := range m.tenantIDs {
189-
if _, matched := matchedTenants[tenantID]; !matched {
235+
var jobs = make([]interface{}, len(matchedTenants))
236+
var seriesSets = make([]storage.SeriesSet, len(matchedTenants))
237+
var jobPos int
238+
for tenantPos := range m.tenantIDs {
239+
if _, matched := matchedTenants[m.tenantIDs[tenantPos]]; !matched {
190240
continue
191241
}
192-
seriesSets = append(seriesSets, &addLabelsSeriesSet{
193-
// TODO: Consider running Select calls concurrently
194-
upstream: m.queriers[pos].Select(sortSeries, hints, filteredMatchers...),
242+
jobs[jobPos] = &selectJob{
243+
pos: jobPos,
244+
querier: m.queriers[tenantPos],
245+
tenantID: m.tenantIDs[tenantPos],
246+
}
247+
jobPos++
248+
}
249+
250+
run := func(ctx context.Context, jobIntf interface{}) error {
251+
job, ok := jobIntf.(*selectJob)
252+
if !ok {
253+
return fmt.Errorf("unexpected type %T", jobIntf)
254+
}
255+
seriesSets[job.pos] = &addLabelsSeriesSet{
256+
upstream: job.querier.Select(sortSeries, hints, filteredMatchers...),
195257
labels: labels.Labels{
196258
{
197259
Name: defaultTenantLabel,
198-
Value: tenantID,
260+
Value: job.tenantID,
199261
},
200262
},
201-
})
263+
}
264+
return nil
202265
}
266+
267+
err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
268+
if err != nil {
269+
return storage.ErrSeriesSet(err)
270+
}
271+
203272
return storage.NewMergeSeriesSet(seriesSets, storage.ChainedSeriesMerge)
204273
}
205274

@@ -266,13 +335,32 @@ func (m *addLabelsSeriesSet) At() storage.Series {
266335
// The error that iteration as failed with.
267336
// When an error occurs, set cannot continue to iterate.
268337
func (m *addLabelsSeriesSet) Err() error {
269-
return m.upstream.Err()
338+
return errors.Wrapf(m.upstream.Err(), "error querying %s", labelsToString(m.labels))
270339
}
271340

272341
// A collection of warnings for the whole set.
273342
// Warnings could be return even iteration has not failed with error.
274343
func (m *addLabelsSeriesSet) Warnings() storage.Warnings {
275-
return m.upstream.Warnings()
344+
upstream := m.upstream.Warnings()
345+
warnings := make(storage.Warnings, len(upstream))
346+
for pos := range upstream {
347+
warnings[pos] = errors.Wrapf(upstream[pos], "warning querying %s", labelsToString(m.labels))
348+
}
349+
return warnings
350+
}
351+
352+
// rewrite label name to be more readable in error output
353+
func rewriteLabelName(s string) string {
354+
return strings.TrimRight(strings.TrimLeft(s, "_"), "_")
355+
}
356+
357+
// this outputs a more readable error format
358+
func labelsToString(labels labels.Labels) string {
359+
parts := make([]string, len(labels))
360+
for pos, l := range labels {
361+
parts[pos] = rewriteLabelName(l.Name) + " " + l.Value
362+
}
363+
return strings.Join(parts, ", ")
276364
}
277365

278366
type addLabelsSeries struct {

0 commit comments

Comments
 (0)