Skip to content

Commit ca89b03

Browse files
authored
Added local backend support to new ruler storage config (#3932)
* Added local backend support to new ruler storage config Signed-off-by: Marco Pracucci <[email protected]> * Rolledback moving errors Signed-off-by: Marco Pracucci <[email protected]>
1 parent 50c4f54 commit ca89b03

File tree

18 files changed

+126
-111
lines changed

18 files changed

+126
-111
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
* [CHANGE] Alertmanager now removes local files after Alertmanager is no longer running for removed or resharded user. #3910
66
* [CHANGE] Alertmanager now stores local files in per-tenant folders. Files stored by Alertmanager previously are migrated to new hierarchy. Support for this migration will be removed in Cortex 1.10. #3910
7+
* [FEATURE] Ruler Storage: Added `local` backend support to the ruler storage configuration under the `-ruler-storage.` flag prefix. #3932
78
* [ENHANCEMENT] Ruler: optimized `<prefix>/api/v1/rules` and `<prefix>/api/v1/alerts` when ruler sharding is enabled. #3916
89
* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916
910
* `cortex_ruler_clients`

docs/configuration/config-file-reference.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ tenant_federation:
155155

156156
ruler_storage:
157157
# Backend storage to use. Supported backends are: s3, gcs, azure, swift,
158-
# filesystem, configdb.
158+
# filesystem, configdb, local.
159159
# CLI flag: -ruler-storage.backend
160160
[backend: <string> | default = "s3"]
161161

@@ -356,6 +356,11 @@ ruler_storage:
356356
# The CLI flags prefix for this block config is: ruler-storage
357357
[configdb: <configstore_config>]
358358

359+
local:
360+
# Directory to scan for rules
361+
# CLI flag: -ruler-storage.local.directory
362+
[directory: <string> | default = ""]
363+
359364
# The configs_config configures the Cortex Configs DB and API.
360365
[configs: <configs_config>]
361366

pkg/cortex/modules.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) {
642642
if !t.Cfg.Ruler.StoreConfig.IsDefaults() {
643643
t.RulerStorage, err = ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, rules.FileLoader{}, util_log.Logger)
644644
} else {
645-
t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
645+
t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, util_log.Logger, prometheus.DefaultRegisterer)
646646
}
647647
return
648648
}

pkg/ruler/api.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"gopkg.in/yaml.v3"
2222

2323
"github.com/cortexproject/cortex/pkg/cortexpb"
24-
store "github.com/cortexproject/cortex/pkg/ruler/rulespb"
24+
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
2525
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
2626
"github.com/cortexproject/cortex/pkg/tenant"
2727
util_log "github.com/cortexproject/cortex/pkg/util/log"
@@ -405,7 +405,7 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) {
405405
return
406406
}
407407

408-
err = a.store.LoadRuleGroups(req.Context(), map[string]rulestore.RuleGroupList{userID: rgs})
408+
err = a.store.LoadRuleGroups(req.Context(), map[string]rulespb.RuleGroupList{userID: rgs})
409409
if err != nil {
410410
http.Error(w, err.Error(), http.StatusBadRequest)
411411
return
@@ -435,7 +435,7 @@ func (a *API) GetRuleGroup(w http.ResponseWriter, req *http.Request) {
435435
return
436436
}
437437

438-
formatted := store.FromProto(rg)
438+
formatted := rulespb.FromProto(rg)
439439
marshalAndSend(formatted, w, logger)
440440
}
441441

@@ -495,7 +495,7 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) {
495495
return
496496
}
497497

498-
rgProto := store.ToProto(userID, namespace, rg)
498+
rgProto := rulespb.ToProto(userID, namespace, rg)
499499

500500
level.Debug(logger).Log("msg", "attempting to store rulegroup", "userID", userID, "group", rgProto.String())
501501
err = a.store.SetRuleGroup(req.Context(), userID, namespace, rgProto)

pkg/ruler/api_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"github.com/stretchr/testify/require"
1717
"github.com/weaveworks/common/user"
1818

19-
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
19+
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
2020
"github.com/cortexproject/cortex/pkg/util/services"
2121
)
2222

@@ -171,7 +171,7 @@ func TestRuler_alerts(t *testing.T) {
171171
}
172172

173173
func TestRuler_Create(t *testing.T) {
174-
cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulestore.RuleGroupList)))
174+
cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulespb.RuleGroupList)))
175175
defer cleanup()
176176

177177
r, rcleanup := newTestRuler(t, cfg)
@@ -301,7 +301,7 @@ func TestRuler_DeleteNamespace(t *testing.T) {
301301
}
302302

303303
func TestRuler_Limits(t *testing.T) {
304-
cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulestore.RuleGroupList)))
304+
cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulespb.RuleGroupList)))
305305
defer cleanup()
306306

307307
r, rcleanup := newTestRuler(t, cfg)

pkg/ruler/manager.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/weaveworks/common/user"
2020
"golang.org/x/net/context/ctxhttp"
2121

22-
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
22+
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
2323
)
2424

2525
type DefaultMultiTenantManager struct {
@@ -91,7 +91,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg
9191
}, nil
9292
}
9393

94-
func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulestore.RuleGroupList) {
94+
func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
9595
// A lock is taken to ensure if this function is called concurrently, then each call
9696
// returns after the call map files and check for updates
9797
r.userManagerMtx.Lock()
@@ -121,7 +121,7 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
121121

122122
// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
123123
// the users Prometheus Rules Manager.
124-
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulestore.RuleGroupList) {
124+
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
125125
// Map the files to disk and return the file names to be passed to the users manager if they
126126
// have been updated
127127
update, files, err := r.mapper.MapRules(user, groups.Formatted())

pkg/ruler/manager_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"go.uber.org/atomic"
1717

1818
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
19-
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
2019
"github.com/cortexproject/cortex/pkg/util/test"
2120
)
2221

@@ -32,7 +31,7 @@ func TestSyncRuleGroups(t *testing.T) {
3231

3332
const user = "testUser"
3433

35-
userRules := map[string]rulestore.RuleGroupList{
34+
userRules := map[string]rulespb.RuleGroupList{
3635
user: {
3736
&rulespb.RuleGroupDesc{
3837
Name: "group1",

pkg/ruler/ruler.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
170170
type MultiTenantManager interface {
171171
// SyncRuleGroups is used to sync the Manager with rules from the RuleStore.
172172
// If existing user is missing in the ruleGroups map, its ruler manager will be stopped.
173-
SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulestore.RuleGroupList)
173+
SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList)
174174
// GetRules fetches rules for a particular tenant (userID).
175175
GetRules(userID string) []*promRules.Group
176176
// Stop stops all Manager components.
@@ -470,7 +470,7 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) {
470470
r.manager.SyncRuleGroups(ctx, configs)
471471
}
472472

473-
func (r *Ruler) listRules(ctx context.Context) (map[string]rulestore.RuleGroupList, error) {
473+
func (r *Ruler) listRules(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
474474
switch {
475475
case !r.cfg.EnableSharding:
476476
return r.listRulesNoSharding(ctx)
@@ -486,17 +486,17 @@ func (r *Ruler) listRules(ctx context.Context) (map[string]rulestore.RuleGroupLi
486486
}
487487
}
488488

489-
func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulestore.RuleGroupList, error) {
489+
func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
490490
return r.store.ListAllRuleGroups(ctx)
491491
}
492492

493-
func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulestore.RuleGroupList, error) {
493+
func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
494494
configs, err := r.store.ListAllRuleGroups(ctx)
495495
if err != nil {
496496
return nil, err
497497
}
498498

499-
filteredConfigs := make(map[string]rulestore.RuleGroupList)
499+
filteredConfigs := make(map[string]rulespb.RuleGroupList)
500500
for userID, groups := range configs {
501501
filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
502502
if len(filtered) > 0 {
@@ -506,7 +506,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulest
506506
return filteredConfigs, nil
507507
}
508508

509-
func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulestore.RuleGroupList, error) {
509+
func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
510510
users, err := r.store.ListAllUsers(ctx)
511511
if err != nil {
512512
return nil, errors.Wrap(err, "unable to list users of ruler")
@@ -540,7 +540,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulest
540540
close(userCh)
541541

542542
mu := sync.Mutex{}
543-
result := map[string]rulestore.RuleGroupList{}
543+
result := map[string]rulespb.RuleGroupList{}
544544

545545
concurrency := loadRulesConcurrency
546546
if len(userRings) < concurrency {

pkg/ruler/ruler_test.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -261,15 +261,15 @@ func TestSharding(t *testing.T) {
261261
user2Group1Token := tokenForGroup(user2Group1)
262262
user3Group1Token := tokenForGroup(user3Group1)
263263

264-
noRules := map[string]rulestore.RuleGroupList{}
265-
allRules := map[string]rulestore.RuleGroupList{
264+
noRules := map[string]rulespb.RuleGroupList{}
265+
allRules := map[string]rulespb.RuleGroupList{
266266
user1: {user1Group1, user1Group2},
267267
user2: {user2Group1},
268268
user3: {user3Group1},
269269
}
270270

271271
// ruler ID -> (user ID -> list of groups).
272-
type expectedRulesMap map[string]map[string]rulestore.RuleGroupList
272+
type expectedRulesMap map[string]map[string]rulespb.RuleGroupList
273273

274274
type testCase struct {
275275
sharding bool
@@ -321,12 +321,12 @@ func TestSharding(t *testing.T) {
321321
},
322322

323323
expectedRules: expectedRulesMap{
324-
ruler1: map[string]rulestore.RuleGroupList{
324+
ruler1: map[string]rulespb.RuleGroupList{
325325
user1: {user1Group1},
326326
user2: {user2Group1},
327327
},
328328

329-
ruler2: map[string]rulestore.RuleGroupList{
329+
ruler2: map[string]rulespb.RuleGroupList{
330330
user1: {user1Group2},
331331
user3: {user3Group1},
332332
},
@@ -349,7 +349,7 @@ func TestSharding(t *testing.T) {
349349

350350
expectedRules: expectedRulesMap{
351351
// This ruler doesn't get rules from unhealthy ruler (RF=1).
352-
ruler1: map[string]rulestore.RuleGroupList{
352+
ruler1: map[string]rulespb.RuleGroupList{
353353
user1: {user1Group1},
354354
user2: {user2Group1},
355355
},
@@ -447,10 +447,10 @@ func TestSharding(t *testing.T) {
447447
},
448448

449449
expectedRules: expectedRulesMap{
450-
ruler1: map[string]rulestore.RuleGroupList{
450+
ruler1: map[string]rulespb.RuleGroupList{
451451
user1: {user1Group1, user1Group2},
452452
},
453-
ruler2: map[string]rulestore.RuleGroupList{
453+
ruler2: map[string]rulespb.RuleGroupList{
454454
user2: {user2Group1},
455455
user3: {user3Group1},
456456
},
@@ -468,13 +468,13 @@ func TestSharding(t *testing.T) {
468468
},
469469

470470
expectedRules: expectedRulesMap{
471-
ruler1: map[string]rulestore.RuleGroupList{
471+
ruler1: map[string]rulespb.RuleGroupList{
472472
user1: {user1Group1},
473473
},
474-
ruler2: map[string]rulestore.RuleGroupList{
474+
ruler2: map[string]rulespb.RuleGroupList{
475475
user1: {user1Group2},
476476
},
477-
ruler3: map[string]rulestore.RuleGroupList{
477+
ruler3: map[string]rulespb.RuleGroupList{
478478
user2: {user2Group1},
479479
user3: {user3Group1},
480480
},
@@ -492,11 +492,11 @@ func TestSharding(t *testing.T) {
492492
},
493493

494494
expectedRules: expectedRulesMap{
495-
ruler1: map[string]rulestore.RuleGroupList{
495+
ruler1: map[string]rulespb.RuleGroupList{
496496
user1: {user1Group1, user1Group2},
497497
},
498498
ruler2: noRules, // Ruler2 owns token for user2group1, but user-2 will only be handled by ruler-1 and 3.
499-
ruler3: map[string]rulestore.RuleGroupList{
499+
ruler3: map[string]rulespb.RuleGroupList{
500500
user2: {user2Group1},
501501
user3: {user3Group1},
502502
},
@@ -583,7 +583,7 @@ func TestSharding(t *testing.T) {
583583
require.NoError(t, err)
584584
// Normalize nil map to empty one.
585585
if loaded == nil {
586-
loaded = map[string]rulestore.RuleGroupList{}
586+
loaded = map[string]rulespb.RuleGroupList{}
587587
}
588588
expected[id] = loaded
589589
}

pkg/ruler/rulespb/custom.go

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package rulespb
2+
3+
import "github.com/prometheus/prometheus/pkg/rulefmt"
4+
5+
// RuleGroupList contains a set of rule groups
6+
type RuleGroupList []*RuleGroupDesc
7+
8+
// Formatted returns the rule group list as a set of formatted rule groups mapped
9+
// by namespace
10+
func (l RuleGroupList) Formatted() map[string][]rulefmt.RuleGroup {
11+
ruleMap := map[string][]rulefmt.RuleGroup{}
12+
for _, g := range l {
13+
if _, exists := ruleMap[g.Namespace]; !exists {
14+
ruleMap[g.Namespace] = []rulefmt.RuleGroup{FromProto(g)}
15+
continue
16+
}
17+
ruleMap[g.Namespace] = append(ruleMap[g.Namespace], FromProto(g))
18+
19+
}
20+
return ruleMap
21+
}

pkg/ruler/rulestore/bucketclient/bucket_client.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) {
9797
}
9898

9999
// ListAllRuleGroups implements rules.RuleStore.
100-
func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulestore.RuleGroupList, error) {
101-
out := map[string]rulestore.RuleGroupList{}
100+
func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
101+
out := map[string]rulespb.RuleGroupList{}
102102

103103
// List rule groups for all tenants.
104104
err := b.bucket.Iter(ctx, "", func(key string) error {
@@ -126,10 +126,10 @@ func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rul
126126
}
127127

128128
// ListRuleGroupsForUserAndNamespace implements rules.RuleStore.
129-
func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulestore.RuleGroupList, error) {
129+
func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) {
130130
userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider)
131131

132-
groupList := rulestore.RuleGroupList{}
132+
groupList := rulespb.RuleGroupList{}
133133

134134
// The prefix to list objects depends on whether the namespace has been
135135
// specified in the request.
@@ -162,7 +162,7 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context,
162162
}
163163

164164
// LoadRuleGroups implements rules.RuleStore.
165-
func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulestore.RuleGroupList) error {
165+
func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error {
166166
ch := make(chan *rulespb.RuleGroupDesc)
167167

168168
// Given we store one file per rule group. With this, we create a pool of workers that will

pkg/ruler/rulestore/config.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,24 @@ import (
44
"flag"
55

66
"github.com/cortexproject/cortex/pkg/configs/client"
7+
"github.com/cortexproject/cortex/pkg/ruler/rulestore/configdb"
8+
"github.com/cortexproject/cortex/pkg/ruler/rulestore/local"
79
"github.com/cortexproject/cortex/pkg/storage/bucket"
810
)
911

10-
const (
11-
ConfigDB = "configdb"
12-
13-
Name = "ruler-storage"
14-
prefix = "ruler-storage."
15-
)
16-
1712
// Config configures a rule store.
1813
type Config struct {
1914
bucket.Config `yaml:",inline"`
2015
ConfigDB client.Config `yaml:"configdb"`
16+
Local local.Config `yaml:"local"`
2117
}
2218

2319
// RegisterFlags registers the backend storage config.
2420
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
25-
cfg.ExtraBackends = []string{ConfigDB}
21+
prefix := "ruler-storage."
22+
23+
cfg.ExtraBackends = []string{configdb.Name, local.Name}
2624
cfg.ConfigDB.RegisterFlagsWithPrefix(prefix, f)
25+
cfg.Local.RegisterFlagsWithPrefix(prefix, f)
2726
cfg.RegisterFlagsWithPrefix(prefix, f)
2827
}

0 commit comments

Comments
 (0)