Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatcher groups limits #4254

Merged
merged 6 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* `memberlist_client_kv_store_value_tombstones`
* `memberlist_client_kv_store_value_tombstones_removed_total`
* `memberlist_client_messages_to_broadcast_dropped_total`
* [ENHANCEMENT] Alertmanager: Added `-alertmanager.max-dispatcher-aggregation-groups` option to control max number of active dispatcher groups in Alertmanager (per tenant, also overrideable). When the limit is reached, Dispatcher produces log message and increases `alertmanager_dispatcher_aggregation_group_limit_reached_total` metric. #4254
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128
* [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176
* [BUGFIX] Alertmanager: fix Alertmanager status page if clustering via gossip is disabled or sharding is enabled. #4184
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4172,6 +4172,14 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# uploaded via Alertmanager API. 0 = no limit.
# CLI flag: -alertmanager.max-template-size-bytes
[alertmanager_max_template_size_bytes: <int> | default = 0]

# Maximum number of aggregation groups in Alertmanager's dispatcher that a
# tenant can have. Each active aggregation group uses single goroutine. When the
# limit is reached, dispatcher will not dispatch alerts that belong to
# additional aggregation groups, but existing groups will keep working properly.
# 0 = no limit.
# CLI flag: -alertmanager.max-dispatcher-aggregation-groups
[alertmanager_max_dispatcher_aggregation_groups: <int> | default = 0]
```

### `redis_config`
Expand Down
7 changes: 6 additions & 1 deletion docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ Currently experimental features are:
- `-ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file
- Instance limits in ingester and distributor
- Exemplar storage, currently in-memory only within the Ingester based on Prometheus exemplar storage (`-blocks-storage.tsdb.max-exemplars`)
- Alertmanager: notification rate limits. (`-alertmanager.notification-rate-limit` and `-alertmanager.notification-rate-limit-per-integration`)
- Querier limits:
- `-querier.max-fetched-chunks-per-query`
- `-querier.max-fetched-chunk-bytes-per-query`
- `-querier.max-fetched-series-per-query`
- Alertmanager limits
- notification rate (`-alertmanager.notification-rate-limit` and `-alertmanager.notification-rate-limit-per-integration`)
- dispatcher groups (`-alertmanager.max-dispatcher-aggregation-groups`)
- user config size (`-alertmanager.max-config-size-bytes`)
- templates count in user config (`-alertmanager.max-templates-count`)
- max template size (`-alertmanager.max-template-size-bytes`)
13 changes: 11 additions & 2 deletions pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
am.mux.Handle(a, http.NotFoundHandler())
}

am.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, am.registry)
am.dispatcherMetrics = dispatch.NewDispatcherMetrics(true, am.registry)

//TODO: From this point onward, the alertmanager _might_ receive requests - we need to make sure we've settled and are ready.
return am, nil
Expand Down Expand Up @@ -382,7 +382,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
pipeline,
am.marker,
timeoutFunc,
nil,
&dispatcherLimits{tenant: am.cfg.UserID, limits: am.cfg.Limits},
log.With(am.logger, "component", "dispatcher"),
am.dispatcherMetrics,
)
Expand Down Expand Up @@ -575,3 +575,12 @@ func (t *tenantRateLimits) RateLimit() rate.Limit {
func (t *tenantRateLimits) Burst() int {
return t.limits.NotificationBurstSize(t.tenant, t.integration)
}

type dispatcherLimits struct {
tenant string
limits Limits
}

func (g *dispatcherLimits) MaxNumberOfAggregationGroups() int {
return g.limits.AlertmanagerMaxDispatcherAggregationGroups(g.tenant)
}
9 changes: 8 additions & 1 deletion pkg/alertmanager/alertmanager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type alertmanagerMetrics struct {
persistTotal *prometheus.Desc
persistFailed *prometheus.Desc

notificationRateLimited *prometheus.Desc
notificationRateLimited *prometheus.Desc
dispatcherAggregationGroupsLimitReached *prometheus.Desc
}

func newAlertmanagerMetrics() *alertmanagerMetrics {
Expand Down Expand Up @@ -209,6 +210,10 @@ func newAlertmanagerMetrics() *alertmanagerMetrics {
"cortex_alertmanager_notification_rate_limited_total",
"Total number of rate-limited notifications per integration.",
[]string{"user", "integration"}, nil),
dispatcherAggregationGroupsLimitReached: prometheus.NewDesc(
"cortex_alertmanager_dispatcher_aggregation_group_limit_reached_total",
"Number of times when dispatcher failed to create new aggregation group due to limit.",
[]string{"user"}, nil),
}
}

Expand Down Expand Up @@ -259,6 +264,7 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
out <- m.persistTotal
out <- m.persistFailed
out <- m.notificationRateLimited
out <- m.dispatcherAggregationGroupsLimitReached
}

func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
Expand Down Expand Up @@ -306,4 +312,5 @@ func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfCounters(out, m.persistFailed, "alertmanager_state_persist_failed_total")

data.SendSumOfCountersPerUserWithLabels(out, m.notificationRateLimited, "alertmanager_notification_rate_limited_total", "integration")
data.SendSumOfCountersPerUser(out, m.dispatcherAggregationGroupsLimitReached, "alertmanager_dispatcher_aggregation_group_limit_reached_total")
}
112 changes: 112 additions & 0 deletions pkg/alertmanager/alertmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package alertmanager

import (
"fmt"
"net/url"
"strings"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/util/test"
)

func TestDispatcherGroupLimits(t *testing.T) {
for name, tc := range map[string]struct {
groups int
groupsLimit int
expectedFailures int
}{
"no limit": {groups: 5, groupsLimit: 0, expectedFailures: 0},
"high limit": {groups: 5, groupsLimit: 10, expectedFailures: 0},
"low limit": {groups: 5, groupsLimit: 3, expectedFailures: 4}, // 2 groups that fail, 2 alerts per group = 4 failures
} {
t.Run(name, func(t *testing.T) {
createAlertmanagerAndSendAlerts(t, tc.groups, tc.groupsLimit, tc.expectedFailures)
})
}
}

func createAlertmanagerAndSendAlerts(t *testing.T, alertGroups, groupsLimit, expectedFailures int) {
user := "test"

reg := prometheus.NewPedanticRegistry()
am, err := New(&Config{
UserID: user,
Logger: log.NewNopLogger(),
Limits: &mockAlertManagerLimits{maxDispatcherAggregationGroups: groupsLimit},
TenantDataDir: t.TempDir(),
ExternalURL: &url.URL{Path: "/am"},
ShardingEnabled: false,
}, reg)
require.NoError(t, err)
defer am.StopAndWait()

cfgRaw := `receivers:
- name: 'prod'

route:
group_by: ['alertname']
group_wait: 10ms
group_interval: 10ms
receiver: 'prod'`

cfg, err := config.Load(cfgRaw)
require.NoError(t, err)
require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw))

now := time.Now()

for i := 0; i < alertGroups; i++ {
alertName := model.LabelValue(fmt.Sprintf("Alert-%d", i))

inputAlerts := []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": alertName,
"a": "b",
},
Annotations: model.LabelSet{"foo": "bar"},
StartsAt: now,
EndsAt: now.Add(5 * time.Minute),
GeneratorURL: "http://example.com/prometheus",
},
UpdatedAt: now,
Timeout: false,
},

{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": alertName,
"z": "y",
},
Annotations: model.LabelSet{"foo": "bar"},
StartsAt: now,
EndsAt: now.Add(5 * time.Minute),
GeneratorURL: "http://example.com/prometheus",
},
UpdatedAt: now,
Timeout: false,
},
}
require.NoError(t, am.alerts.Put(inputAlerts...))
}

// Give it some time, as alerts are sent to dispatcher asynchronously.
test.Poll(t, 3*time.Second, nil, func() interface{} {
return testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP alertmanager_dispatcher_aggregation_group_limit_reached_total Number of times when dispatcher failed to create new aggregation group due to limit.
# TYPE alertmanager_dispatcher_aggregation_group_limit_reached_total counter
alertmanager_dispatcher_aggregation_group_limit_reached_total %d
`, expectedFailures)), "alertmanager_dispatcher_aggregation_group_limit_reached_total")
})
}
4 changes: 4 additions & 0 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ type Limits interface {

// AlertmanagerMaxTemplateSize returns max size of individual template. 0 = no limit.
AlertmanagerMaxTemplateSize(tenant string) int

// AlertmanagerMaxNumberOfDispatcherAggregationGroups returns maximum number of aggregation groups in Alertmanager's dispatcher that a tenant can have.
// Each aggregation group consumes single goroutine. 0 = unlimited.
AlertmanagerMaxDispatcherAggregationGroups(t string) int
}

// A MultitenantAlertmanager manages Alertmanager instances for multiple
Expand Down
15 changes: 10 additions & 5 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,11 +2019,12 @@ func (f *passthroughAlertmanagerClientPool) GetClientFor(addr string) (Client, e
}

type mockAlertManagerLimits struct {
emailNotificationRateLimit rate.Limit
emailNotificationBurst int
maxConfigSize int
maxTemplatesCount int
maxSizeOfTemplate int
emailNotificationRateLimit rate.Limit
emailNotificationBurst int
maxConfigSize int
maxTemplatesCount int
maxSizeOfTemplate int
maxDispatcherAggregationGroups int
}

func (m *mockAlertManagerLimits) AlertmanagerMaxConfigSize(tenant string) int {
Expand Down Expand Up @@ -2053,3 +2054,7 @@ func (m *mockAlertManagerLimits) NotificationRateLimit(_ string, integration str
func (m *mockAlertManagerLimits) NotificationBurstSize(_ string, integration string) int {
return m.emailNotificationBurst
}

func (m *mockAlertManagerLimits) AlertmanagerMaxDispatcherAggregationGroups(_ string) int {
return m.maxDispatcherAggregationGroups
}
12 changes: 9 additions & 3 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ type Limits struct {
NotificationRateLimit float64 `yaml:"alertmanager_notification_rate_limit" json:"alertmanager_notification_rate_limit"`
NotificationRateLimitPerIntegration NotificationRateLimitMap `yaml:"alertmanager_notification_rate_limit_per_integration" json:"alertmanager_notification_rate_limit_per_integration"`

AlertmanagerMaxConfigSizeBytes int `yaml:"alertmanager_max_config_size_bytes" json:"alertmanager_max_config_size_bytes"`
AlertmanagerMaxTemplatesCount int `yaml:"alertmanager_max_templates_count" json:"alertmanager_max_templates_count"`
AlertmanagerMaxTemplateSizeBytes int `yaml:"alertmanager_max_template_size_bytes" json:"alertmanager_max_template_size_bytes"`
AlertmanagerMaxConfigSizeBytes int `yaml:"alertmanager_max_config_size_bytes" json:"alertmanager_max_config_size_bytes"`
AlertmanagerMaxTemplatesCount int `yaml:"alertmanager_max_templates_count" json:"alertmanager_max_templates_count"`
AlertmanagerMaxTemplateSizeBytes int `yaml:"alertmanager_max_template_size_bytes" json:"alertmanager_max_template_size_bytes"`
AlertmanagerMaxDispatcherAggregationGroups int `yaml:"alertmanager_max_dispatcher_aggregation_groups" json:"alertmanager_max_dispatcher_aggregation_groups"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -181,6 +182,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.AlertmanagerMaxConfigSizeBytes, "alertmanager.max-config-size-bytes", 0, "Maximum size of configuration file for Alertmanager that tenant can upload via Alertmanager API. 0 = no limit.")
f.IntVar(&l.AlertmanagerMaxTemplatesCount, "alertmanager.max-templates-count", 0, "Maximum number of templates in tenant's Alertmanager configuration uploaded via Alertmanager API. 0 = no limit.")
f.IntVar(&l.AlertmanagerMaxTemplateSizeBytes, "alertmanager.max-template-size-bytes", 0, "Maximum size of single template in tenant's Alertmanager configuration uploaded via Alertmanager API. 0 = no limit.")
f.IntVar(&l.AlertmanagerMaxDispatcherAggregationGroups, "alertmanager.max-dispatcher-aggregation-groups", 0, "Maximum number of aggregation groups in Alertmanager's dispatcher that a tenant can have. Each active aggregation group uses single goroutine. When the limit is reached, dispatcher will not dispatch alerts that belong to additional aggregation groups, but existing groups will keep working properly. 0 = no limit.")
}

// Validate the limits config and returns an error if the validation
Expand Down Expand Up @@ -605,6 +607,10 @@ func (o *Overrides) AlertmanagerMaxTemplateSize(userID string) int {
return o.getOverridesForUser(userID).AlertmanagerMaxTemplateSizeBytes
}

func (o *Overrides) AlertmanagerMaxDispatcherAggregationGroups(userID string) int {
return o.getOverridesForUser(userID).AlertmanagerMaxDispatcherAggregationGroups
}

func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits.ByUserID(userID)
Expand Down