Skip to content

Commit 0a19a7d

Browse files
authored
Add concurrent evaluation for ruler (#5766)
* add concurrent evaluation for ruler Signed-off-by: Ben Ye <[email protected]> * format Signed-off-by: Ben Ye <[email protected]> * update doc Signed-off-by: Ben Ye <[email protected]> * changelog Signed-off-by: Ben Ye <[email protected]> * fix ooo chunks test Signed-off-by: Ben Ye <[email protected]> * fix vendor Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent df0a2dc commit 0a19a7d

File tree

16 files changed

+87
-946
lines changed

16 files changed

+87
-946
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
1111
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
1212
* [FEATURE] Tracing: Add `tracing.otel.round-robin` flag to use `round_robin` gRPC client side LB policy for sending OTLP traces. #5731
13+
* [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766
1314
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
1415
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
1516
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684

docs/configuration/config-file-reference.md

+10
Original file line numberDiff line numberDiff line change
@@ -3968,6 +3968,16 @@ alertmanager_client:
39683968
# CLI flag: -ruler.resend-delay
39693969
[resend_delay: <duration> | default = 1m]
39703970
3971+
# If enabled, rules from a single rule group can be evaluated concurrently if
3972+
# there is no dependency between each other. Max concurrency for each rule group
3973+
# is controlled via ruler.max-concurrent-evals flag.
3974+
# CLI flag: -ruler.concurrent-evals-enabled
3975+
[concurrent_evals_enabled: <boolean> | default = false]
3976+
3977+
# Max concurrency for a single rule group to evaluate independent rules.
3978+
# CLI flag: -ruler.max-concurrent-evals
3979+
[max_concurrent_evals: <int> | default = 1]
3980+
39713981
# Distribute rule evaluation using ring backend
39723982
# CLI flag: -ruler.enable-sharding
39733983
[enable_sharding: <boolean> | default = false]

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ require (
4646
github.com/prometheus/client_model v0.5.0
4747
github.com/prometheus/common v0.46.0
4848
// Prometheus maps version 2.x.y to tags v0.x.y.
49-
github.com/prometheus/prometheus v0.49.2-0.20240202164002-aa845f7c12ce
49+
github.com/prometheus/prometheus v0.49.2-0.20240205174859-6005ac6f9dc6
5050
github.com/segmentio/fasthash v1.0.3
5151
github.com/sony/gobreaker v0.5.0
5252
github.com/spf13/afero v1.9.5

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -1320,8 +1320,8 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua
13201320
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
13211321
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
13221322
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
1323-
github.com/prometheus/prometheus v0.49.2-0.20240202164002-aa845f7c12ce h1:gHCPxX6dJZJOZh/nYy0DmnTu2PbgWjs8hY0eLgofPfA=
1324-
github.com/prometheus/prometheus v0.49.2-0.20240202164002-aa845f7c12ce/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
1323+
github.com/prometheus/prometheus v0.49.2-0.20240205174859-6005ac6f9dc6 h1:E1dnG12fSlUeHST75LpGqPpd/YCOSNqKD2CUmm3Em90=
1324+
github.com/prometheus/prometheus v0.49.2-0.20240205174859-6005ac6f9dc6/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
13251325
github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw=
13261326
github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU=
13271327
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=

pkg/compactor/compactor_test.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"encoding/json"
88
"flag"
99
"fmt"
10+
"io"
1011
"os"
1112
"path"
1213
"path/filepath"
@@ -30,7 +31,6 @@ import (
3031
"github.com/thanos-io/thanos/pkg/block"
3132
"github.com/thanos-io/thanos/pkg/block/metadata"
3233
"github.com/thanos-io/thanos/pkg/compact"
33-
thanos_testutil "github.com/thanos-io/thanos/pkg/testutil/e2eutil"
3434
"gopkg.in/yaml.v2"
3535

3636
"github.com/cortexproject/cortex/pkg/ring"
@@ -1065,8 +1065,19 @@ func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) {
10651065
b1 := createTSDBBlock(t, bucketClient, "user-1", 10, 20, map[string]string{"__name__": "Teste"})
10661066
b2 := createTSDBBlock(t, bucketClient, "user-1", 20, 30, map[string]string{"__name__": "Teste"})
10671067

1068-
err := thanos_testutil.PutOutOfOrderIndex(path.Join(tmpDir, "user-1", b1.String()), 10, 20)
1068+
// Read bad index file.
1069+
indexFile, err := os.Open("testdata/out_of_order_chunks/index")
10691070
require.NoError(t, err)
1071+
indexFileStat, err := indexFile.Stat()
1072+
require.NoError(t, err)
1073+
1074+
dir := path.Join(tmpDir, "user-1", b1.String())
1075+
outputFile, err := os.OpenFile(path.Join(dir, "index"), os.O_RDWR|os.O_TRUNC, 0755)
1076+
require.NoError(t, err)
1077+
1078+
n, err := io.Copy(outputFile, indexFile)
1079+
require.NoError(t, err)
1080+
require.Equal(t, indexFileStat.Size(), n)
10701081

10711082
cfg := prepareConfig()
10721083
cfg.SkipBlocksWithOutOfOrderChunksEnabled = true
@@ -1097,7 +1108,7 @@ func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) {
10971108

10981109
// Wait until a run has completed.
10991110
cortex_testutil.Poll(t, 5*time.Second, true, func() interface{} {
1100-
if _, err := os.Stat(path.Join(tmpDir, "user-1", b1.String(), "no-compact-mark.json")); err == nil {
1111+
if _, err := os.Stat(path.Join(dir, "no-compact-mark.json")); err == nil {
11011112
return true
11021113
}
11031114
return false
224 Bytes
Binary file not shown.

pkg/ruler/compat.go

+13-11
Original file line numberDiff line numberDiff line change
@@ -325,17 +325,19 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
325325
metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries)
326326

327327
return rules.NewManager(&rules.ManagerOptions{
328-
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
329-
Queryable: q,
330-
QueryFunc: RecordAndReportRuleQueryMetrics(metricsQueryFunc, queryTime, logger),
331-
Context: user.InjectOrgID(ctx, userID),
332-
ExternalURL: cfg.ExternalURL.URL,
333-
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()),
334-
Logger: log.With(logger, "user", userID),
335-
Registerer: reg,
336-
OutageTolerance: cfg.OutageTolerance,
337-
ForGracePeriod: cfg.ForGracePeriod,
338-
ResendDelay: cfg.ResendDelay,
328+
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
329+
Queryable: q,
330+
QueryFunc: RecordAndReportRuleQueryMetrics(metricsQueryFunc, queryTime, logger),
331+
Context: user.InjectOrgID(ctx, userID),
332+
ExternalURL: cfg.ExternalURL.URL,
333+
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()),
334+
Logger: log.With(logger, "user", userID),
335+
Registerer: reg,
336+
OutageTolerance: cfg.OutageTolerance,
337+
ForGracePeriod: cfg.ForGracePeriod,
338+
ResendDelay: cfg.ResendDelay,
339+
ConcurrentEvalsEnabled: cfg.ConcurrentEvalsEnabled,
340+
MaxConcurrentEvals: cfg.MaxConcurrentEvals,
339341
})
340342
}
341343
}

pkg/ruler/ruler.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ var (
4444
supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
4545

4646
// Validation errors.
47-
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
48-
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
47+
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
48+
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
49+
errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0")
4950
)
5051

5152
const (
@@ -95,7 +96,7 @@ type Config struct {
9596
RulePath string `yaml:"rule_path"`
9697

9798
// URL of the Alertmanager to send notifications to.
98-
// If your are configuring the ruler to send to a Cortex Alertmanager,
99+
// If you are configuring the ruler to send to a Cortex Alertmanager,
99100
// ensure this includes any path set in the Alertmanager external URL.
100101
AlertmanagerURL string `yaml:"alertmanager_url"`
101102
// Whether to use DNS SRV records to discover Alertmanager.
@@ -118,6 +119,9 @@ type Config struct {
118119
// Minimum amount of time to wait before resending an alert to Alertmanager.
119120
ResendDelay time.Duration `yaml:"resend_delay"`
120121

122+
ConcurrentEvalsEnabled bool `yaml:"concurrent_evals_enabled"`
123+
MaxConcurrentEvals int64 `yaml:"max_concurrent_evals"`
124+
121125
// Enable sharding rule groups.
122126
EnableSharding bool `yaml:"enable_sharding"`
123127
ShardingStrategy string `yaml:"sharding_strategy"`
@@ -149,6 +153,10 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error {
149153
if err := cfg.ClientTLSConfig.Validate(log); err != nil {
150154
return errors.Wrap(err, "invalid ruler gRPC client config")
151155
}
156+
157+
if cfg.ConcurrentEvalsEnabled && cfg.MaxConcurrentEvals <= 0 {
158+
return errInvalidMaxConcurrentEvals
159+
}
152160
return nil
153161
}
154162

@@ -188,6 +196,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
188196
f.DurationVar(&cfg.OutageTolerance, "ruler.for-outage-tolerance", time.Hour, `Max time to tolerate outage for restoring "for" state of alert.`)
189197
f.DurationVar(&cfg.ForGracePeriod, "ruler.for-grace-period", 10*time.Minute, `Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period.`)
190198
f.DurationVar(&cfg.ResendDelay, "ruler.resend-delay", time.Minute, `Minimum amount of time to wait before resending an alert to Alertmanager.`)
199+
f.BoolVar(&cfg.ConcurrentEvalsEnabled, "ruler.concurrent-evals-enabled", false, `If enabled, rules from a single rule group can be evaluated concurrently if there is no dependency between each other. Max concurrency for each rule group is controlled via ruler.max-concurrent-evals flag.`)
200+
f.Int64Var(&cfg.MaxConcurrentEvals, "ruler.max-concurrent-evals", 1, `Max concurrency for a single rule group to evaluate independent rules.`)
191201

192202
f.Var(&cfg.EnabledTenants, "ruler.enabled-tenants", "Comma separated list of tenants whose rules this ruler can evaluate. If specified, only these tenants will be handled by ruler, otherwise this ruler can process rules from all tenants. Subject to sharding.")
193203
f.Var(&cfg.DisabledTenants, "ruler.disabled-tenants", "Comma separated list of tenants whose rules this ruler cannot evaluate. If specified, a ruler that would normally pick the specified tenant(s) for processing will ignore them instead. Subject to sharding.")

vendor/github.com/prometheus/prometheus/storage/remote/client.go

+6-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/prometheus/prometheus/tsdb/index/index.go

+26-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/copy.go

-55
This file was deleted.

vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/port.go

-20
This file was deleted.

0 commit comments

Comments
 (0)