Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alertmanager: Rate limit email notifier #4135

Merged
merged 9 commits into from
May 6, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* `cortex_alertmanager_state_persist_total`
* `cortex_alertmanager_state_persist_failed_total`
* [ENHANCEMENT] Blocks storage: support ingesting exemplars. Enabled by setting new CLI flag `-blocks-storage.tsdb.max-exemplars=<n>` or config option `blocks_storage.tsdb.max_exemplars` to positive value. #4124
* [FEATURE] Alertmanager: Added rate-limits to email notifier. Rate limits can be configured using `-alertmanager.email-notification-rate-limit` and `-alertmanager.email-notification-burst-size`. These limits are applied on individual alertmanagers. Rate-limited email notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128

## 1.9.0 in progress
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4106,6 +4106,17 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# and local multicast addresses.
# CLI flag: -alertmanager.receivers-firewall-block-private-addresses
[alertmanager_receivers_firewall_block_private_addresses: <boolean> | default = false]

# Per-user rate limit for sending email notifications from Alertmanager in
# emails/sec. 0 = rate limit disabled. Negative value = no emails are allowed.
# CLI flag: -alertmanager.email-notification-rate-limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to have separate configuration for each notification type? I think there are some advantages to that in flexibility, but might make the config a bit complicated if this is extended to all the receivers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to have separate configuration for each notification type?

Yes, that was my plan. Do you suggest to use single rate-limit configuration for all notification types? We can also have one generic rate-limit config for all integrations, with per-integration-type overrides in case they are needed. WDYT?

Would you also suggest to use single rate-limiter shared across all notifiers? (I guess not, because eg. too many webhook notifications could stop email notifications).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also have one generic rate-limit config for all integrations, with per-integration-type overrides in case they are needed.

Ah... we cannot easily distinguish between default values and missing values, so this would be difficult.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah... we cannot easily distinguish between default values and missing values, so this would be difficult.

Can we use pointer values in the config? If pointer is nil then it has not been set. I haven't checked if it's doable, just asking.

Copy link
Contributor Author

@pstibrany pstibrany May 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use pointer values in the config? If pointer is nil then it has not been set. I haven't checked if it's doable, just asking.

This is doable, but could lead to confusing rulers when trying to setup overrides correctly.

Eg.:

global limits:

  • no defined shared (for all integrations) rate limit (a)
  • defined email rate limit (b)

user A:

  • defined shared (for all integrations) rate limit (c)
  • undefined email rate limit (d)

Now when computing rate limit for email integration for "user A", we can either use his shared limits (c), or global email rate limits (b), but it's unclear what is a better option, and no matter what we choose, we will confuse some people.

I would leave that to a separate discussion and PR for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave that to a separate discussion and PR for now.

I agree on this. @ranton256 What do you think? Some feedback on this would be great. Thanks!

[alertmanager_email_notification_rate_limit: <float> | default = 0]

# Per-user burst size for email notifications. If set to 0, no email
# notifications will be sent, unless rate-limit is disabled, in which case all
# email notifications are allowed.
# CLI flag: -alertmanager.email-notification-burst-size
[alertmanager_email_notification_burst_size: <int> | default = 1]
```

### `redis_config`
Expand Down
44 changes: 40 additions & 4 deletions pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
commoncfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"golang.org/x/time/rate"

"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -99,6 +100,9 @@ type Alertmanager struct {
mux *http.ServeMux
registry *prometheus.Registry

// Pipeline created during last ApplyConfig call. Used for testing only.
lastPipeline notify.Stage

// The Dispatcher is the only component we need to recreate when we call ApplyConfig.
// Given its metrics don't have any variable labels we need to re-use the same metrics.
dispatcherMetrics *dispatch.DispatcherMetrics
Expand All @@ -107,6 +111,8 @@ type Alertmanager struct {
// Further, in upstream AM, this metric is handled using the config coordinator which we don't use
// hence we need to generate the metric ourselves.
configHashMetric prometheus.Gauge

rateLimitedNotifications *prometheus.CounterVec
}

var (
Expand Down Expand Up @@ -155,6 +161,11 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
Name: "alertmanager_config_hash",
Help: "Hash of the currently loaded alertmanager configuration.",
}),

rateLimitedNotifications: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "alertmanager_notification_rate_limited_total",
Help: "Number of rate-limited notifications per integration.",
}, []string{"integration"}), // "integration" is consistent with other alertmanager metrics.
}

am.registry = reg
Expand Down Expand Up @@ -325,7 +336,17 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
// Create a firewall binded to the per-tenant config.
firewallDialer := util_net.NewFirewallDialer(newFirewallDialerConfigProvider(userID, am.cfg.Limits))

integrationsMap, err := buildIntegrationsMap(conf.Receivers, tmpl, firewallDialer, am.logger)
integrationsMap, err := buildIntegrationsMap(conf.Receivers, tmpl, firewallDialer, am.logger, func(integrationName string, notifier notify.Notifier) notify.Notifier {
if integrationName == "email" && am.cfg.Limits != nil {
rl := &tenantRateLimits{
tenant: userID,
limits: am.cfg.Limits,
}

return newRateLimitedNotifier(notifier, rl, 10*time.Second, am.rateLimitedNotifications.WithLabelValues(integrationName))
}
return notifier
})
if err != nil {
return nil
}
Expand All @@ -344,6 +365,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
am.nflog,
am.state,
)
am.lastPipeline = pipeline
am.dispatcher = dispatch.NewDispatcher(
am.alerts,
dispatch.NewRoute(conf.Route, nil),
Expand Down Expand Up @@ -417,10 +439,10 @@ func (am *Alertmanager) getFullState() (*clusterpb.FullState, error) {

// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a
// list of receiver config.
func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger) (map[string][]notify.Integration, error) {
func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger, notifierWrapper func(string, notify.Notifier) notify.Notifier) (map[string][]notify.Integration, error) {
integrationsMap := make(map[string][]notify.Integration, len(nc))
for _, rcv := range nc {
integrations, err := buildReceiverIntegrations(rcv, tmpl, firewallDialer, logger)
integrations, err := buildReceiverIntegrations(rcv, tmpl, firewallDialer, logger, notifierWrapper)
if err != nil {
return nil, err
}
Expand All @@ -432,7 +454,7 @@ func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, firewa
// buildReceiverIntegrations builds a list of integration notifiers off of a
// receiver config.
// Taken from https://github.com/prometheus/alertmanager/blob/94d875f1227b29abece661db1a68c001122d1da5/cmd/alertmanager/main.go#L112-L159.
func buildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger) ([]notify.Integration, error) {
func buildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger, wrapper func(string, notify.Notifier) notify.Notifier) ([]notify.Integration, error) {
var (
errs types.MultiError
integrations []notify.Integration
Expand All @@ -442,6 +464,7 @@ func buildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, fir
errs.Add(err)
return
}
n = wrapper(name, n)
integrations = append(integrations, notify.NewIntegration(n, rs, name, i))
}
)
Expand Down Expand Up @@ -526,3 +549,16 @@ func (p firewallDialerConfigProvider) BlockCIDRNetworks() []flagext.CIDR {
func (p firewallDialerConfigProvider) BlockPrivateAddresses() bool {
return p.limits.AlertmanagerReceiversBlockPrivateAddresses(p.userID)
}

type tenantRateLimits struct {
tenant string
limits Limits
}

func (t *tenantRateLimits) RateLimit() rate.Limit {
return t.limits.EmailNotificationRateLimit(t.tenant)
}

func (t *tenantRateLimits) Burst() int {
return t.limits.EmailNotificationBurst(t.tenant)
}
9 changes: 9 additions & 0 deletions pkg/alertmanager/alertmanager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type alertmanagerMetrics struct {
initialSyncDuration *prometheus.Desc
persistTotal *prometheus.Desc
persistFailed *prometheus.Desc

notificationRateLimited *prometheus.Desc
}

func newAlertmanagerMetrics() *alertmanagerMetrics {
Expand Down Expand Up @@ -203,6 +205,10 @@ func newAlertmanagerMetrics() *alertmanagerMetrics {
"cortex_alertmanager_state_persist_failed_total",
"Number of times we have failed to persist the running state to storage.",
nil, nil),
notificationRateLimited: prometheus.NewDesc(
"cortex_alertmanager_notification_rate_limited_total",
"Total number of rate-limited notifications per integration.",
[]string{"user", "integration"}, nil),
}
}

Expand Down Expand Up @@ -252,6 +258,7 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
out <- m.initialSyncDuration
out <- m.persistTotal
out <- m.persistFailed
out <- m.notificationRateLimited
}

func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
Expand Down Expand Up @@ -297,4 +304,6 @@ func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfHistograms(out, m.initialSyncDuration, "alertmanager_state_initial_sync_duration_seconds")
data.SendSumOfCounters(out, m.persistTotal, "alertmanager_state_persist_total")
data.SendSumOfCounters(out, m.persistFailed, "alertmanager_state_persist_failed_total")

data.SendSumOfCountersPerUserWithLabels(out, m.notificationRateLimited, "alertmanager_notification_rate_limited_total", "integration")
}
12 changes: 12 additions & 0 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/user"
"golang.org/x/time/rate"

"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
Expand Down Expand Up @@ -222,6 +223,17 @@ type Limits interface {
// AlertmanagerReceiversBlockPrivateAddresses returns true if private addresses should be blocked
// in the Alertmanager receivers for the given user.
AlertmanagerReceiversBlockPrivateAddresses(user string) bool

// EmailNotificationRateLimit returns limit used by rate-limiter. If set to 0, no emails are allowed.
// rate.Inf = all emails are allowed.
//
// Note that when negative or zero values specified by user are translated to rate.Limit by Overrides,
// and may have different meaning there.
EmailNotificationRateLimit(tenant string) rate.Limit

// EmailNotificationBurst returns burst-size for rate limiter. If 0, no notifications are allowed except
// when limit == rate.Inf.
EmailNotificationBurst(tenant string) int
}

// A MultitenantAlertmanager manages Alertmanager instances for multiple
Expand Down
79 changes: 79 additions & 0 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/pkg/labels"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"golang.org/x/time/rate"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
Expand Down Expand Up @@ -1707,6 +1709,62 @@ func TestStoreTemplateFile(t *testing.T) {
require.False(t, changed)
}

func TestMultitenantAlertmanager_verifyRateLimitedEmailConfig(t *testing.T) {
ctx := context.Background()

config := `global:
resolve_timeout: 1m
smtp_require_tls: false

route:
receiver: 'email'

receivers:
- name: 'email'
email_configs:
- to: [email protected]
from: [email protected]
smarthost: smtp:2525
`

// Run this test using a real storage client.
store := prepareInMemoryAlertStore()
require.NoError(t, store.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
User: "user",
RawConfig: config,
Templates: []*alertspb.TemplateDesc{},
}))

limits := mockAlertManagerLimits{
emailNotificationRateLimit: 0,
emailNotificationBurst: 0,
}

reg := prometheus.NewPedanticRegistry()
cfg := mockAlertmanagerConfig(t)
am, err := createMultitenantAlertmanager(cfg, nil, nil, store, nil, limits, log.NewNopLogger(), reg)
require.NoError(t, err)

err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)
require.Len(t, am.alertmanagers, 1)

am.alertmanagersMtx.Lock()
uam := am.alertmanagers["user"]
am.alertmanagersMtx.Unlock()

require.NotNil(t, uam)

ctx = notify.WithReceiverName(ctx, "email")
ctx = notify.WithGroupKey(ctx, "key")
ctx = notify.WithRepeatInterval(ctx, time.Minute)

// Verify that rate-limiter is in place for email notifier.
_, _, err = uam.lastPipeline.Exec(ctx, log.NewNopLogger(), &types.Alert{})
require.NotNil(t, err)
require.Contains(t, err.Error(), errRateLimited.Error())
}

type passthroughAlertmanagerClient struct {
server alertmanagerpb.AlertmanagerServer
}
Expand Down Expand Up @@ -1755,3 +1813,24 @@ func (f *passthroughAlertmanagerClientPool) GetClientFor(addr string) (Client, e
}
return Client(&passthroughAlertmanagerClient{s}), nil
}

type mockAlertManagerLimits struct {
emailNotificationRateLimit rate.Limit
emailNotificationBurst int
}

func (m mockAlertManagerLimits) AlertmanagerReceiversBlockCIDRNetworks(user string) []flagext.CIDR {
panic("implement me")
}

func (m mockAlertManagerLimits) AlertmanagerReceiversBlockPrivateAddresses(user string) bool {
panic("implement me")
}

func (m mockAlertManagerLimits) EmailNotificationRateLimit(_ string) rate.Limit {
return m.emailNotificationRateLimit
}

func (m mockAlertManagerLimits) EmailNotificationBurst(_ string) int {
return m.emailNotificationBurst
}
65 changes: 65 additions & 0 deletions pkg/alertmanager/rate_limited_notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package alertmanager

import (
"context"
"errors"
"time"

"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"golang.org/x/time/rate"
)

type rateLimits interface {
RateLimit() rate.Limit
Burst() int
}

type rateLimitedNotifier struct {
upstream notify.Notifier
counter prometheus.Counter

limiter *rate.Limiter
limits rateLimits

recheckInterval time.Duration
recheckAt atomic.Int64 // unix nanoseconds timestamp
}

func newRateLimitedNotifier(upstream notify.Notifier, limits rateLimits, recheckInterval time.Duration, counter prometheus.Counter) *rateLimitedNotifier {
return &rateLimitedNotifier{
upstream: upstream,
counter: counter,
limits: limits,
limiter: rate.NewLimiter(limits.RateLimit(), limits.Burst()),
recheckInterval: recheckInterval,
}
}

var errRateLimited = errors.New("failed to notify due to rate limits")

func (r *rateLimitedNotifier) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
now := time.Now()
if now.UnixNano() >= r.recheckAt.Load() {
if limit := r.limits.RateLimit(); r.limiter.Limit() != limit {
r.limiter.SetLimitAt(now, limit)
}

if burst := r.limits.Burst(); r.limiter.Burst() != burst {
r.limiter.SetBurstAt(now, burst)
}

r.recheckAt.Store(now.UnixNano() + r.recheckInterval.Nanoseconds())
}

// This counts as single notification, no matter how many alerts there are in it.
if !r.limiter.AllowN(now, 1) {
r.counter.Inc()
// Don't retry this notification later.
return false, errRateLimited
}

return r.upstream.Notify(ctx, alerts...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should cancel the limiter reservation if the upstream returns error. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure. These limits will be already quite high. If user is hitting them, it's likely that user is doing something wrong or bad.

Imagine someone abusing alertmanager notifications for sending lot of requests to a website, which starts crashing and return 500. This can look like failed notification – if we cancel the reservation, it allows bad actor to keep sending more requests. (This PR is only dealing with emails, but we will reuse this for other integrations).

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point and your example makes sense. I'm also thinking about the opposite case: a legit receiver backend server is down, retries hit the rate limit even if no notification has been successfully delivered. However, since we can't distinguish it, it's probably safer to keep the current logic as you suggest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @pstibrany we should count against the limit whether success or fail, because opens some possibilities for abuse/broken things to make it through if done the other way.

}
Loading