Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruler ignore users #4074

Merged
merged 8 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [ENHANCEMENT] Add a metric `cortex_compactor_compaction_interval_seconds` for the compaction interval config value. #4040
* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
* [ENHANCEMENT] Cortex is now built with Go 1.16. #4062
* [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,18 @@ ring:
# Enable the ruler api
# CLI flag: -experimental.ruler.enable-api
[enable_api: <boolean> | default = false]

# Comma separated list of tenants whose rules this ruler can evaluate. If
# specified, only these tenants will be handled by ruler, otherwise this ruler
# can process rules from all tenants. Subject to sharding.
# CLI flag: -ruler.enabled-tenants
[enabled_tenants: <string> | default = ""]

# Comma separated list of tenants whose rules this ruler cannot evaluate. If
# specified, and ruler would normally pick given tenant for processing (eg. via
# sharding), tenant will be ignored instead.
# CLI flag: -ruler.disabled-tenants
[disabled_tenants: <string> | default = ""]
```

### `ruler_storage_config`
Expand Down
37 changes: 3 additions & 34 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,7 @@ type Compactor struct {
logger log.Logger
parentLogger log.Logger
registerer prometheus.Registerer

// If empty, all users are enabled. If not empty, only users in the map are enabled (possibly owned by compactor, also subject to sharding configuration).
enabledUsers map[string]struct{}

// If empty, no users are disabled. If not empty, users in the map are disabled (not owned by this compactor).
disabledUsers map[string]struct{}
allowedUsers *util.AllowedUsers

// Functions that creates bucket client, grouper, planner and compactor using the context.
// Useful for injecting mock objects from tests.
Expand Down Expand Up @@ -272,6 +267,7 @@ func newCompactor(
bucketClientFactory: bucketClientFactory,
blocksGrouperFactory: blocksGrouperFactory,
blocksCompactorFactory: blocksCompactorFactory,
allowedUsers: util.NewAllowedUsers(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants),

compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_started_total",
Expand Down Expand Up @@ -321,20 +317,9 @@ func newCompactor(
}

if len(compactorCfg.EnabledTenants) > 0 {
c.enabledUsers = map[string]struct{}{}
for _, u := range compactorCfg.EnabledTenants {
c.enabledUsers[u] = struct{}{}
}

level.Info(c.logger).Log("msg", "using enabled users", "enabled", strings.Join(compactorCfg.EnabledTenants, ", "))
}

if len(compactorCfg.DisabledTenants) > 0 {
c.disabledUsers = map[string]struct{}{}
for _, u := range compactorCfg.DisabledTenants {
c.disabledUsers[u] = struct{}{}
}

level.Info(c.logger).Log("msg", "using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", "))
}

Expand Down Expand Up @@ -711,7 +696,7 @@ func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) {
}

func (c *Compactor) ownUser(userID string) (bool, error) {
if !isAllowedUser(c.enabledUsers, c.disabledUsers, userID) {
if !c.allowedUsers.IsAllowed(userID) {
return false, nil
}

Expand All @@ -738,22 +723,6 @@ func (c *Compactor) ownUser(userID string) (bool, error) {
return rs.Instances[0].Addr == c.ringLifecycler.Addr, nil
}

func isAllowedUser(enabledUsers, disabledUsers map[string]struct{}, userID string) bool {
if len(enabledUsers) > 0 {
if _, ok := enabledUsers[userID]; !ok {
return false
}
}

if len(disabledUsers) > 0 {
if _, ok := disabledUsers[userID]; ok {
return false
}
}

return true
}

const compactorMetaPrefix = "compactor-meta-"

// metaSyncDirForUser returns directory to store cached meta files.
Expand Down
42 changes: 0 additions & 42 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,48 +1152,6 @@ func mockDeletionMarkJSON(id string, deletionTime time.Time) string {
return string(content)
}

func TestAllowedUser(t *testing.T) {
testCases := map[string]struct {
enabled, disabled map[string]struct{}
user string
expected bool
}{
"no enabled or disabled": {
user: "test",
expected: true,
},

"only enabled, enabled": {
enabled: map[string]struct{}{"user": {}},
user: "user",
expected: true,
},

"only enabled, disabled": {
enabled: map[string]struct{}{"user": {}},
user: "not user",
expected: false,
},

"only disabled, disabled": {
disabled: map[string]struct{}{"user": {}},
user: "user",
expected: false,
},

"only disabled, enabled": {
disabled: map[string]struct{}{"user": {}},
user: "not user",
expected: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
require.Equal(t, tc.expected, isAllowedUser(tc.enabled, tc.disabled, tc.user))
})
}
}

func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
numUsers := 10

Expand Down
50 changes: 39 additions & 11 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type Config struct {

EnableAPI bool `yaml:"enable_api"`

EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`

RingCheckPeriod time.Duration `yaml:"-"`
}

Expand Down Expand Up @@ -163,6 +166,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.ForGracePeriod, "ruler.for-grace-period", 10*time.Minute, `Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period.`)
f.DurationVar(&cfg.ResendDelay, "ruler.resend-delay", time.Minute, `Minimum amount of time to wait before resending an alert to Alertmanager.`)

f.Var(&cfg.EnabledTenants, "ruler.enabled-tenants", "Comma separated list of tenants whose rules this ruler can evaluate. If specified, only these tenants will be handled by ruler, otherwise this ruler can process rules from all tenants. Subject to sharding.")
f.Var(&cfg.DisabledTenants, "ruler.disabled-tenants", "Comma separated list of tenants whose rules this ruler cannot evaluate. If specified, and ruler would normally pick given tenant for processing (eg. via sharding), tenant will be ignored instead.")

cfg.RingCheckPeriod = 5 * time.Second
}

Expand Down Expand Up @@ -224,20 +230,23 @@ type Ruler struct {
ringCheckErrors prometheus.Counter
rulerSync *prometheus.CounterVec

allowedUsers *util.AllowedUsers

registry prometheus.Registerer
logger log.Logger
}

// NewRuler creates a new ruler from a distributor and chunk store.
func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) {
ruler := &Ruler{
cfg: cfg,
store: ruleStore,
manager: manager,
registry: reg,
logger: logger,
limits: limits,
clientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg),
cfg: cfg,
store: ruleStore,
manager: manager,
registry: reg,
logger: logger,
limits: limits,
clientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg),
allowedUsers: util.NewAllowedUsers(cfg.EnabledTenants, cfg.DisabledTenants),

ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ruler_ring_check_errors_total",
Expand All @@ -250,6 +259,13 @@ func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer,
}, []string{"reason"}),
}

if len(cfg.EnabledTenants) > 0 {
level.Info(ruler.logger).Log("msg", "using enabled users", "enabled", strings.Join(cfg.EnabledTenants, ", "))
}
if len(cfg.DisabledTenants) > 0 {
level.Info(ruler.logger).Log("msg", "using disabled users", "disabled", strings.Join(cfg.DisabledTenants, ", "))
}

if cfg.EnableSharding {
ringStore, err := kv.NewClient(
cfg.Ring.KVStore,
Expand Down Expand Up @@ -472,20 +488,32 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) {
r.manager.SyncRuleGroups(ctx, configs)
}

func (r *Ruler) listRules(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGroupList, err error) {
switch {
case !r.cfg.EnableSharding:
return r.listRulesNoSharding(ctx)
result, err = r.listRulesNoSharding(ctx)

case r.cfg.ShardingStrategy == util.ShardingStrategyDefault:
return r.listRulesShardingDefault(ctx)
result, err = r.listRulesShardingDefault(ctx)

case r.cfg.ShardingStrategy == util.ShardingStrategyShuffle:
return r.listRulesShuffleSharding(ctx)
result, err = r.listRulesShuffleSharding(ctx)

default:
return nil, errors.New("invalid sharding configuration")
}

if err != nil {
return
}

for userID := range result {
if !r.allowedUsers.IsAllowed(userID) {
level.Debug(r.logger).Log("msg", "ignoring rule groups for user, not allowed", "user", userID)
delete(result, userID)
}
}
return
}

func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
Expand Down
Loading