Skip to content

Commit 71bfa41

Browse files
authored
Ruler ignore users (#4074)
* Extracted AllowedUsers from Compactor code. Signed-off-by: Peter Štibraný <[email protected]> * Added support for enabled/disabled tenants in ruler. Signed-off-by: Peter Štibraný <[email protected]> * CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * CHANGELOG.md and documentation Signed-off-by: Peter Štibraný <[email protected]> * Removed obsolete test. Signed-off-by: Peter Štibraný <[email protected]> * Make lint happy. Signed-off-by: Peter Štibraný <[email protected]> * Review feedback. Signed-off-by: Peter Štibraný <[email protected]> * Fix field names. Signed-off-by: Peter Štibraný <[email protected]>
1 parent dd7fc29 commit 71bfa41

File tree

8 files changed

+277
-95
lines changed

8 files changed

+277
-95
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* [ENHANCEMENT] Add a metric `cortex_compactor_compaction_interval_seconds` for the compaction interval config value. #4040
3939
* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
4040
* [ENHANCEMENT] Cortex is now built with Go 1.16. #4062
41+
* [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074
4142
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
4243
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
4344
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959

docs/configuration/config-file-reference.md

+12
Original file line numberDiff line numberDiff line change
@@ -1584,6 +1584,18 @@ ring:
15841584
# Enable the ruler api
15851585
# CLI flag: -experimental.ruler.enable-api
15861586
[enable_api: <boolean> | default = false]
1587+
1588+
# Comma separated list of tenants whose rules this ruler can evaluate. If
1589+
# specified, only these tenants will be handled by ruler, otherwise this ruler
1590+
# can process rules from all tenants. Subject to sharding.
1591+
# CLI flag: -ruler.enabled-tenants
1592+
[enabled_tenants: <string> | default = ""]
1593+
1594+
# Comma separated list of tenants whose rules this ruler cannot evaluate. If
1595+
# specified, a ruler that would normally pick the specified tenant(s) for
1596+
# processing will ignore them instead. Subject to sharding.
1597+
# CLI flag: -ruler.disabled-tenants
1598+
[disabled_tenants: <string> | default = ""]
15871599
```
15881600
15891601
### `ruler_storage_config`

pkg/compactor/compactor.go

+11-42
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,13 @@ type ConfigProvider interface {
172172
type Compactor struct {
173173
services.Service
174174

175-
compactorCfg Config
176-
storageCfg cortex_tsdb.BlocksStorageConfig
177-
cfgProvider ConfigProvider
178-
logger log.Logger
179-
parentLogger log.Logger
180-
registerer prometheus.Registerer
181-
182-
// If empty, all users are enabled. If not empty, only users in the map are enabled (possibly owned by compactor, also subject to sharding configuration).
183-
enabledUsers map[string]struct{}
184-
185-
// If empty, no users are disabled. If not empty, users in the map are disabled (not owned by this compactor).
186-
disabledUsers map[string]struct{}
175+
compactorCfg Config
176+
storageCfg cortex_tsdb.BlocksStorageConfig
177+
cfgProvider ConfigProvider
178+
logger log.Logger
179+
parentLogger log.Logger
180+
registerer prometheus.Registerer
181+
allowedTenants *util.AllowedTenants
187182

188183
// Functions that creates bucket client, grouper, planner and compactor using the context.
189184
// Useful for injecting mock objects from tests.
@@ -272,6 +267,7 @@ func newCompactor(
272267
bucketClientFactory: bucketClientFactory,
273268
blocksGrouperFactory: blocksGrouperFactory,
274269
blocksCompactorFactory: blocksCompactorFactory,
270+
allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants),
275271

276272
compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
277273
Name: "cortex_compactor_runs_started_total",
@@ -321,21 +317,10 @@ func newCompactor(
321317
}
322318

323319
if len(compactorCfg.EnabledTenants) > 0 {
324-
c.enabledUsers = map[string]struct{}{}
325-
for _, u := range compactorCfg.EnabledTenants {
326-
c.enabledUsers[u] = struct{}{}
327-
}
328-
329-
level.Info(c.logger).Log("msg", "using enabled users", "enabled", strings.Join(compactorCfg.EnabledTenants, ", "))
320+
level.Info(c.logger).Log("msg", "compactor using enabled users", "enabled", strings.Join(compactorCfg.EnabledTenants, ", "))
330321
}
331-
332322
if len(compactorCfg.DisabledTenants) > 0 {
333-
c.disabledUsers = map[string]struct{}{}
334-
for _, u := range compactorCfg.DisabledTenants {
335-
c.disabledUsers[u] = struct{}{}
336-
}
337-
338-
level.Info(c.logger).Log("msg", "using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", "))
323+
level.Info(c.logger).Log("msg", "compactor using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", "))
339324
}
340325

341326
c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
@@ -711,7 +696,7 @@ func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) {
711696
}
712697

713698
func (c *Compactor) ownUser(userID string) (bool, error) {
714-
if !isAllowedUser(c.enabledUsers, c.disabledUsers, userID) {
699+
if !c.allowedTenants.IsAllowed(userID) {
715700
return false, nil
716701
}
717702

@@ -738,22 +723,6 @@ func (c *Compactor) ownUser(userID string) (bool, error) {
738723
return rs.Instances[0].Addr == c.ringLifecycler.Addr, nil
739724
}
740725

741-
func isAllowedUser(enabledUsers, disabledUsers map[string]struct{}, userID string) bool {
742-
if len(enabledUsers) > 0 {
743-
if _, ok := enabledUsers[userID]; !ok {
744-
return false
745-
}
746-
}
747-
748-
if len(disabledUsers) > 0 {
749-
if _, ok := disabledUsers[userID]; ok {
750-
return false
751-
}
752-
}
753-
754-
return true
755-
}
756-
757726
const compactorMetaPrefix = "compactor-meta-"
758727

759728
// metaSyncDirForUser returns directory to store cached meta files.

pkg/compactor/compactor_test.go

-42
Original file line numberDiff line numberDiff line change
@@ -1152,48 +1152,6 @@ func mockDeletionMarkJSON(id string, deletionTime time.Time) string {
11521152
return string(content)
11531153
}
11541154

1155-
func TestAllowedUser(t *testing.T) {
1156-
testCases := map[string]struct {
1157-
enabled, disabled map[string]struct{}
1158-
user string
1159-
expected bool
1160-
}{
1161-
"no enabled or disabled": {
1162-
user: "test",
1163-
expected: true,
1164-
},
1165-
1166-
"only enabled, enabled": {
1167-
enabled: map[string]struct{}{"user": {}},
1168-
user: "user",
1169-
expected: true,
1170-
},
1171-
1172-
"only enabled, disabled": {
1173-
enabled: map[string]struct{}{"user": {}},
1174-
user: "not user",
1175-
expected: false,
1176-
},
1177-
1178-
"only disabled, disabled": {
1179-
disabled: map[string]struct{}{"user": {}},
1180-
user: "user",
1181-
expected: false,
1182-
},
1183-
1184-
"only disabled, enabled": {
1185-
disabled: map[string]struct{}{"user": {}},
1186-
user: "not user",
1187-
expected: true,
1188-
},
1189-
}
1190-
for name, tc := range testCases {
1191-
t.Run(name, func(t *testing.T) {
1192-
require.Equal(t, tc.expected, isAllowedUser(tc.enabled, tc.disabled, tc.user))
1193-
})
1194-
}
1195-
}
1196-
11971155
func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
11981156
numUsers := 10
11991157

pkg/ruler/ruler.go

+39-11
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ type Config struct {
107107

108108
EnableAPI bool `yaml:"enable_api"`
109109

110+
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
111+
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
112+
110113
RingCheckPeriod time.Duration `yaml:"-"`
111114
}
112115

@@ -163,6 +166,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
163166
f.DurationVar(&cfg.ForGracePeriod, "ruler.for-grace-period", 10*time.Minute, `Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period.`)
164167
f.DurationVar(&cfg.ResendDelay, "ruler.resend-delay", time.Minute, `Minimum amount of time to wait before resending an alert to Alertmanager.`)
165168

169+
f.Var(&cfg.EnabledTenants, "ruler.enabled-tenants", "Comma separated list of tenants whose rules this ruler can evaluate. If specified, only these tenants will be handled by ruler, otherwise this ruler can process rules from all tenants. Subject to sharding.")
170+
f.Var(&cfg.DisabledTenants, "ruler.disabled-tenants", "Comma separated list of tenants whose rules this ruler cannot evaluate. If specified, a ruler that would normally pick the specified tenant(s) for processing will ignore them instead. Subject to sharding.")
171+
166172
cfg.RingCheckPeriod = 5 * time.Second
167173
}
168174

@@ -224,20 +230,23 @@ type Ruler struct {
224230
ringCheckErrors prometheus.Counter
225231
rulerSync *prometheus.CounterVec
226232

233+
allowedTenants *util.AllowedTenants
234+
227235
registry prometheus.Registerer
228236
logger log.Logger
229237
}
230238

231239
// NewRuler creates a new ruler from a distributor and chunk store.
232240
func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) {
233241
ruler := &Ruler{
234-
cfg: cfg,
235-
store: ruleStore,
236-
manager: manager,
237-
registry: reg,
238-
logger: logger,
239-
limits: limits,
240-
clientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg),
242+
cfg: cfg,
243+
store: ruleStore,
244+
manager: manager,
245+
registry: reg,
246+
logger: logger,
247+
limits: limits,
248+
clientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg),
249+
allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants),
241250

242251
ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{
243252
Name: "cortex_ruler_ring_check_errors_total",
@@ -250,6 +259,13 @@ func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer,
250259
}, []string{"reason"}),
251260
}
252261

262+
if len(cfg.EnabledTenants) > 0 {
263+
level.Info(ruler.logger).Log("msg", "ruler using enabled users", "enabled", strings.Join(cfg.EnabledTenants, ", "))
264+
}
265+
if len(cfg.DisabledTenants) > 0 {
266+
level.Info(ruler.logger).Log("msg", "ruler using disabled users", "disabled", strings.Join(cfg.DisabledTenants, ", "))
267+
}
268+
253269
if cfg.EnableSharding {
254270
ringStore, err := kv.NewClient(
255271
cfg.Ring.KVStore,
@@ -472,20 +488,32 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) {
472488
r.manager.SyncRuleGroups(ctx, configs)
473489
}
474490

475-
func (r *Ruler) listRules(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
491+
func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGroupList, err error) {
476492
switch {
477493
case !r.cfg.EnableSharding:
478-
return r.listRulesNoSharding(ctx)
494+
result, err = r.listRulesNoSharding(ctx)
479495

480496
case r.cfg.ShardingStrategy == util.ShardingStrategyDefault:
481-
return r.listRulesShardingDefault(ctx)
497+
result, err = r.listRulesShardingDefault(ctx)
482498

483499
case r.cfg.ShardingStrategy == util.ShardingStrategyShuffle:
484-
return r.listRulesShuffleSharding(ctx)
500+
result, err = r.listRulesShuffleSharding(ctx)
485501

486502
default:
487503
return nil, errors.New("invalid sharding configuration")
488504
}
505+
506+
if err != nil {
507+
return
508+
}
509+
510+
for userID := range result {
511+
if !r.allowedTenants.IsAllowed(userID) {
512+
level.Debug(r.logger).Log("msg", "ignoring rule groups for user, not allowed", "user", userID)
513+
delete(result, userID)
514+
}
515+
}
516+
return
489517
}
490518

491519
func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {

0 commit comments

Comments
 (0)