Skip to content

Commit 0595579

Browse files
authored
Alertmanager: Rate limit email notifier (#4135)
* Introduce rate-limit for sending email notifications from alertmanager. Signed-off-by: Peter Štibraný <[email protected]> * Don't retry failed rate-limited notifications. Signed-off-by: Peter Štibraný <[email protected]> * Added test to verify that email notifier is used with rate limits. Signed-off-by: Peter Štibraný <[email protected]> * Improve documentation. Signed-off-by: Peter Štibraný <[email protected]> * CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Rename yaml fields, and add unit test. Signed-off-by: Peter Štibraný <[email protected]> * Fix documentation. Signed-off-by: Peter Štibraný <[email protected]> * Moved changelog entry. Signed-off-by: Peter Štibraný <[email protected]>
1 parent 926a691 commit 0595579

10 files changed

+384
-4
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* `cortex_alertmanager_state_persist_total`
1717
* `cortex_alertmanager_state_persist_failed_total`
1818
* [ENHANCEMENT] Blocks storage: support ingesting exemplars. Enabled by setting new CLI flag `-blocks-storage.tsdb.max-exemplars=<n>` or config option `blocks_storage.tsdb.max_exemplars` to positive value. #4124
19+
* [FEATURE] Alertmanager: Added rate-limits to email notifier. Rate limits can be configured using `-alertmanager.email-notification-rate-limit` and `-alertmanager.email-notification-burst-size`. These limits are applied on individual alertmanagers. Rate-limited email notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135
1920
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128
2021

2122
## 1.9.0 in progress

docs/configuration/config-file-reference.md

+11
Original file line numberDiff line numberDiff line change
@@ -4106,6 +4106,17 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
41064106
# and local multicast addresses.
41074107
# CLI flag: -alertmanager.receivers-firewall-block-private-addresses
41084108
[alertmanager_receivers_firewall_block_private_addresses: <boolean> | default = false]
4109+
4110+
# Per-user rate limit for sending email notifications from Alertmanager in
4111+
# emails/sec. 0 = rate limit disabled. Negative value = no emails are allowed.
4112+
# CLI flag: -alertmanager.email-notification-rate-limit
4113+
[alertmanager_email_notification_rate_limit: <float> | default = 0]
4114+
4115+
# Per-user burst size for email notifications. If set to 0, no email
4116+
# notifications will be sent, unless rate-limit is disabled, in which case all
4117+
# email notifications are allowed.
4118+
# CLI flag: -alertmanager.email-notification-burst-size
4119+
[alertmanager_email_notification_burst_size: <int> | default = 1]
41094120
```
41104121

41114122
### `redis_config`

pkg/alertmanager/alertmanager.go

+40-4
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
commoncfg "github.com/prometheus/common/config"
4444
"github.com/prometheus/common/model"
4545
"github.com/prometheus/common/route"
46+
"golang.org/x/time/rate"
4647

4748
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
4849
"github.com/cortexproject/cortex/pkg/util/flagext"
@@ -99,6 +100,9 @@ type Alertmanager struct {
99100
mux *http.ServeMux
100101
registry *prometheus.Registry
101102

103+
// Pipeline created during last ApplyConfig call. Used for testing only.
104+
lastPipeline notify.Stage
105+
102106
// The Dispatcher is the only component we need to recreate when we call ApplyConfig.
103107
// Given its metrics don't have any variable labels we need to re-use the same metrics.
104108
dispatcherMetrics *dispatch.DispatcherMetrics
@@ -107,6 +111,8 @@ type Alertmanager struct {
107111
// Further, in upstream AM, this metric is handled using the config coordinator which we don't use
108112
// hence we need to generate the metric ourselves.
109113
configHashMetric prometheus.Gauge
114+
115+
rateLimitedNotifications *prometheus.CounterVec
110116
}
111117

112118
var (
@@ -155,6 +161,11 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
155161
Name: "alertmanager_config_hash",
156162
Help: "Hash of the currently loaded alertmanager configuration.",
157163
}),
164+
165+
rateLimitedNotifications: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
166+
Name: "alertmanager_notification_rate_limited_total",
167+
Help: "Number of rate-limited notifications per integration.",
168+
}, []string{"integration"}), // "integration" is consistent with other alertmanager metrics.
158169
}
159170

160171
am.registry = reg
@@ -325,7 +336,17 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
325336
// Create a firewall binded to the per-tenant config.
326337
firewallDialer := util_net.NewFirewallDialer(newFirewallDialerConfigProvider(userID, am.cfg.Limits))
327338

328-
integrationsMap, err := buildIntegrationsMap(conf.Receivers, tmpl, firewallDialer, am.logger)
339+
integrationsMap, err := buildIntegrationsMap(conf.Receivers, tmpl, firewallDialer, am.logger, func(integrationName string, notifier notify.Notifier) notify.Notifier {
340+
if integrationName == "email" && am.cfg.Limits != nil {
341+
rl := &tenantRateLimits{
342+
tenant: userID,
343+
limits: am.cfg.Limits,
344+
}
345+
346+
return newRateLimitedNotifier(notifier, rl, 10*time.Second, am.rateLimitedNotifications.WithLabelValues(integrationName))
347+
}
348+
return notifier
349+
})
329350
if err != nil {
330351
return nil
331352
}
@@ -344,6 +365,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
344365
am.nflog,
345366
am.state,
346367
)
368+
am.lastPipeline = pipeline
347369
am.dispatcher = dispatch.NewDispatcher(
348370
am.alerts,
349371
dispatch.NewRoute(conf.Route, nil),
@@ -417,10 +439,10 @@ func (am *Alertmanager) getFullState() (*clusterpb.FullState, error) {
417439

418440
// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a
419441
// list of receiver config.
420-
func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger) (map[string][]notify.Integration, error) {
442+
func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger, notifierWrapper func(string, notify.Notifier) notify.Notifier) (map[string][]notify.Integration, error) {
421443
integrationsMap := make(map[string][]notify.Integration, len(nc))
422444
for _, rcv := range nc {
423-
integrations, err := buildReceiverIntegrations(rcv, tmpl, firewallDialer, logger)
445+
integrations, err := buildReceiverIntegrations(rcv, tmpl, firewallDialer, logger, notifierWrapper)
424446
if err != nil {
425447
return nil, err
426448
}
@@ -432,7 +454,7 @@ func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, firewa
432454
// buildReceiverIntegrations builds a list of integration notifiers off of a
433455
// receiver config.
434456
// Taken from https://github.com/prometheus/alertmanager/blob/94d875f1227b29abece661db1a68c001122d1da5/cmd/alertmanager/main.go#L112-L159.
435-
func buildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger) ([]notify.Integration, error) {
457+
func buildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger, wrapper func(string, notify.Notifier) notify.Notifier) ([]notify.Integration, error) {
436458
var (
437459
errs types.MultiError
438460
integrations []notify.Integration
@@ -442,6 +464,7 @@ func buildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, fir
442464
errs.Add(err)
443465
return
444466
}
467+
n = wrapper(name, n)
445468
integrations = append(integrations, notify.NewIntegration(n, rs, name, i))
446469
}
447470
)
@@ -526,3 +549,16 @@ func (p firewallDialerConfigProvider) BlockCIDRNetworks() []flagext.CIDR {
526549
func (p firewallDialerConfigProvider) BlockPrivateAddresses() bool {
527550
return p.limits.AlertmanagerReceiversBlockPrivateAddresses(p.userID)
528551
}
552+
553+
type tenantRateLimits struct {
554+
tenant string
555+
limits Limits
556+
}
557+
558+
func (t *tenantRateLimits) RateLimit() rate.Limit {
559+
return t.limits.EmailNotificationRateLimit(t.tenant)
560+
}
561+
562+
func (t *tenantRateLimits) Burst() int {
563+
return t.limits.EmailNotificationBurst(t.tenant)
564+
}

pkg/alertmanager/alertmanager_metrics.go

+9
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ type alertmanagerMetrics struct {
5858
initialSyncDuration *prometheus.Desc
5959
persistTotal *prometheus.Desc
6060
persistFailed *prometheus.Desc
61+
62+
notificationRateLimited *prometheus.Desc
6163
}
6264

6365
func newAlertmanagerMetrics() *alertmanagerMetrics {
@@ -203,6 +205,10 @@ func newAlertmanagerMetrics() *alertmanagerMetrics {
203205
"cortex_alertmanager_state_persist_failed_total",
204206
"Number of times we have failed to persist the running state to storage.",
205207
nil, nil),
208+
notificationRateLimited: prometheus.NewDesc(
209+
"cortex_alertmanager_notification_rate_limited_total",
210+
"Total number of rate-limited notifications per integration.",
211+
[]string{"user", "integration"}, nil),
206212
}
207213
}
208214

@@ -252,6 +258,7 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
252258
out <- m.initialSyncDuration
253259
out <- m.persistTotal
254260
out <- m.persistFailed
261+
out <- m.notificationRateLimited
255262
}
256263

257264
func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
@@ -297,4 +304,6 @@ func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
297304
data.SendSumOfHistograms(out, m.initialSyncDuration, "alertmanager_state_initial_sync_duration_seconds")
298305
data.SendSumOfCounters(out, m.persistTotal, "alertmanager_state_persist_total")
299306
data.SendSumOfCounters(out, m.persistFailed, "alertmanager_state_persist_failed_total")
307+
308+
data.SendSumOfCountersPerUserWithLabels(out, m.notificationRateLimited, "alertmanager_notification_rate_limited_total", "integration")
300309
}

pkg/alertmanager/multitenant.go

+12
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/weaveworks/common/httpgrpc"
2727
"github.com/weaveworks/common/httpgrpc/server"
2828
"github.com/weaveworks/common/user"
29+
"golang.org/x/time/rate"
2930

3031
"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
3132
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
@@ -222,6 +223,17 @@ type Limits interface {
222223
// AlertmanagerReceiversBlockPrivateAddresses returns true if private addresses should be blocked
223224
// in the Alertmanager receivers for the given user.
224225
AlertmanagerReceiversBlockPrivateAddresses(user string) bool
226+
227+
// EmailNotificationRateLimit returns limit used by rate-limiter. If set to 0, no emails are allowed.
228+
// rate.Inf = all emails are allowed.
229+
//
230+
// Note that when negative or zero values specified by user are translated to rate.Limit by Overrides,
231+
// and may have different meaning there.
232+
EmailNotificationRateLimit(tenant string) rate.Limit
233+
234+
// EmailNotificationBurst returns burst-size for rate limiter. If 0, no notifications are allowed except
235+
// when limit == rate.Inf.
236+
EmailNotificationBurst(tenant string) int
225237
}
226238

227239
// A MultitenantAlertmanager manages Alertmanager instances for multiple

pkg/alertmanager/multitenant_test.go

+79
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/go-kit/kit/log"
2323
"github.com/prometheus/alertmanager/cluster/clusterpb"
24+
"github.com/prometheus/alertmanager/notify"
2425
"github.com/prometheus/alertmanager/pkg/labels"
2526
"github.com/prometheus/alertmanager/types"
2627
"github.com/prometheus/client_golang/prometheus"
@@ -32,6 +33,7 @@ import (
3233
"github.com/weaveworks/common/httpgrpc"
3334
"github.com/weaveworks/common/user"
3435
"go.uber.org/atomic"
36+
"golang.org/x/time/rate"
3537
"google.golang.org/grpc"
3638

3739
"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
@@ -1707,6 +1709,62 @@ func TestStoreTemplateFile(t *testing.T) {
17071709
require.False(t, changed)
17081710
}
17091711

1712+
func TestMultitenantAlertmanager_verifyRateLimitedEmailConfig(t *testing.T) {
1713+
ctx := context.Background()
1714+
1715+
config := `global:
1716+
resolve_timeout: 1m
1717+
smtp_require_tls: false
1718+
1719+
route:
1720+
receiver: 'email'
1721+
1722+
receivers:
1723+
- name: 'email'
1724+
email_configs:
1725+
1726+
1727+
smarthost: smtp:2525
1728+
`
1729+
1730+
// Run this test using a real storage client.
1731+
store := prepareInMemoryAlertStore()
1732+
require.NoError(t, store.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
1733+
User: "user",
1734+
RawConfig: config,
1735+
Templates: []*alertspb.TemplateDesc{},
1736+
}))
1737+
1738+
limits := mockAlertManagerLimits{
1739+
emailNotificationRateLimit: 0,
1740+
emailNotificationBurst: 0,
1741+
}
1742+
1743+
reg := prometheus.NewPedanticRegistry()
1744+
cfg := mockAlertmanagerConfig(t)
1745+
am, err := createMultitenantAlertmanager(cfg, nil, nil, store, nil, limits, log.NewNopLogger(), reg)
1746+
require.NoError(t, err)
1747+
1748+
err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
1749+
require.NoError(t, err)
1750+
require.Len(t, am.alertmanagers, 1)
1751+
1752+
am.alertmanagersMtx.Lock()
1753+
uam := am.alertmanagers["user"]
1754+
am.alertmanagersMtx.Unlock()
1755+
1756+
require.NotNil(t, uam)
1757+
1758+
ctx = notify.WithReceiverName(ctx, "email")
1759+
ctx = notify.WithGroupKey(ctx, "key")
1760+
ctx = notify.WithRepeatInterval(ctx, time.Minute)
1761+
1762+
// Verify that rate-limiter is in place for email notifier.
1763+
_, _, err = uam.lastPipeline.Exec(ctx, log.NewNopLogger(), &types.Alert{})
1764+
require.NotNil(t, err)
1765+
require.Contains(t, err.Error(), errRateLimited.Error())
1766+
}
1767+
17101768
type passthroughAlertmanagerClient struct {
17111769
server alertmanagerpb.AlertmanagerServer
17121770
}
@@ -1755,3 +1813,24 @@ func (f *passthroughAlertmanagerClientPool) GetClientFor(addr string) (Client, e
17551813
}
17561814
return Client(&passthroughAlertmanagerClient{s}), nil
17571815
}
1816+
1817+
type mockAlertManagerLimits struct {
1818+
emailNotificationRateLimit rate.Limit
1819+
emailNotificationBurst int
1820+
}
1821+
1822+
func (m mockAlertManagerLimits) AlertmanagerReceiversBlockCIDRNetworks(user string) []flagext.CIDR {
1823+
panic("implement me")
1824+
}
1825+
1826+
func (m mockAlertManagerLimits) AlertmanagerReceiversBlockPrivateAddresses(user string) bool {
1827+
panic("implement me")
1828+
}
1829+
1830+
func (m mockAlertManagerLimits) EmailNotificationRateLimit(_ string) rate.Limit {
1831+
return m.emailNotificationRateLimit
1832+
}
1833+
1834+
func (m mockAlertManagerLimits) EmailNotificationBurst(_ string) int {
1835+
return m.emailNotificationBurst
1836+
}
+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package alertmanager
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"github.com/prometheus/alertmanager/notify"
9+
"github.com/prometheus/alertmanager/types"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"go.uber.org/atomic"
12+
"golang.org/x/time/rate"
13+
)
14+
15+
type rateLimits interface {
16+
RateLimit() rate.Limit
17+
Burst() int
18+
}
19+
20+
type rateLimitedNotifier struct {
21+
upstream notify.Notifier
22+
counter prometheus.Counter
23+
24+
limiter *rate.Limiter
25+
limits rateLimits
26+
27+
recheckInterval time.Duration
28+
recheckAt atomic.Int64 // unix nanoseconds timestamp
29+
}
30+
31+
func newRateLimitedNotifier(upstream notify.Notifier, limits rateLimits, recheckInterval time.Duration, counter prometheus.Counter) *rateLimitedNotifier {
32+
return &rateLimitedNotifier{
33+
upstream: upstream,
34+
counter: counter,
35+
limits: limits,
36+
limiter: rate.NewLimiter(limits.RateLimit(), limits.Burst()),
37+
recheckInterval: recheckInterval,
38+
}
39+
}
40+
41+
var errRateLimited = errors.New("failed to notify due to rate limits")
42+
43+
func (r *rateLimitedNotifier) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
44+
now := time.Now()
45+
if now.UnixNano() >= r.recheckAt.Load() {
46+
if limit := r.limits.RateLimit(); r.limiter.Limit() != limit {
47+
r.limiter.SetLimitAt(now, limit)
48+
}
49+
50+
if burst := r.limits.Burst(); r.limiter.Burst() != burst {
51+
r.limiter.SetBurstAt(now, burst)
52+
}
53+
54+
r.recheckAt.Store(now.UnixNano() + r.recheckInterval.Nanoseconds())
55+
}
56+
57+
// This counts as single notification, no matter how many alerts there are in it.
58+
if !r.limiter.AllowN(now, 1) {
59+
r.counter.Inc()
60+
// Don't retry this notification later.
61+
return false, errRateLimited
62+
}
63+
64+
return r.upstream.Notify(ctx, alerts...)
65+
}

0 commit comments

Comments
 (0)