-
Notifications
You must be signed in to change notification settings - Fork 812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alertmanager: Rate limit email notifier #4135
Changes from all commits
9b20c00
be3a1f1
2ba3dce
0946c25
f85e1e1
9255abb
1838de9
223de7c
e096720
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
|
||
"github.com/go-kit/kit/log" | ||
"github.com/prometheus/alertmanager/cluster/clusterpb" | ||
"github.com/prometheus/alertmanager/notify" | ||
"github.com/prometheus/alertmanager/pkg/labels" | ||
"github.com/prometheus/alertmanager/types" | ||
"github.com/prometheus/client_golang/prometheus" | ||
|
@@ -32,6 +33,7 @@ import ( | |
"github.com/weaveworks/common/httpgrpc" | ||
"github.com/weaveworks/common/user" | ||
"go.uber.org/atomic" | ||
"golang.org/x/time/rate" | ||
"google.golang.org/grpc" | ||
|
||
"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb" | ||
|
@@ -1707,6 +1709,62 @@ func TestStoreTemplateFile(t *testing.T) { | |
require.False(t, changed) | ||
} | ||
|
||
func TestMultitenantAlertmanager_verifyRateLimitedEmailConfig(t *testing.T) { | ||
ctx := context.Background() | ||
|
||
config := `global: | ||
resolve_timeout: 1m | ||
smtp_require_tls: false | ||
|
||
route: | ||
receiver: 'email' | ||
|
||
receivers: | ||
- name: 'email' | ||
email_configs: | ||
- to: [email protected] | ||
from: [email protected] | ||
smarthost: smtp:2525 | ||
` | ||
|
||
// Run this test using a real storage client. | ||
store := prepareInMemoryAlertStore() | ||
require.NoError(t, store.SetAlertConfig(ctx, alertspb.AlertConfigDesc{ | ||
User: "user", | ||
RawConfig: config, | ||
Templates: []*alertspb.TemplateDesc{}, | ||
})) | ||
|
||
limits := mockAlertManagerLimits{ | ||
emailNotificationRateLimit: 0, | ||
emailNotificationBurst: 0, | ||
} | ||
|
||
reg := prometheus.NewPedanticRegistry() | ||
cfg := mockAlertmanagerConfig(t) | ||
am, err := createMultitenantAlertmanager(cfg, nil, nil, store, nil, limits, log.NewNopLogger(), reg) | ||
require.NoError(t, err) | ||
|
||
err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) | ||
require.NoError(t, err) | ||
require.Len(t, am.alertmanagers, 1) | ||
|
||
am.alertmanagersMtx.Lock() | ||
uam := am.alertmanagers["user"] | ||
am.alertmanagersMtx.Unlock() | ||
|
||
require.NotNil(t, uam) | ||
|
||
ctx = notify.WithReceiverName(ctx, "email") | ||
ctx = notify.WithGroupKey(ctx, "key") | ||
ctx = notify.WithRepeatInterval(ctx, time.Minute) | ||
|
||
// Verify that rate-limiter is in place for email notifier. | ||
_, _, err = uam.lastPipeline.Exec(ctx, log.NewNopLogger(), &types.Alert{}) | ||
require.NotNil(t, err) | ||
require.Contains(t, err.Error(), errRateLimited.Error()) | ||
} | ||
|
||
type passthroughAlertmanagerClient struct { | ||
server alertmanagerpb.AlertmanagerServer | ||
} | ||
|
@@ -1755,3 +1813,24 @@ func (f *passthroughAlertmanagerClientPool) GetClientFor(addr string) (Client, e | |
} | ||
return Client(&passthroughAlertmanagerClient{s}), nil | ||
} | ||
|
||
type mockAlertManagerLimits struct { | ||
emailNotificationRateLimit rate.Limit | ||
emailNotificationBurst int | ||
} | ||
|
||
func (m mockAlertManagerLimits) AlertmanagerReceiversBlockCIDRNetworks(user string) []flagext.CIDR { | ||
panic("implement me") | ||
} | ||
|
||
func (m mockAlertManagerLimits) AlertmanagerReceiversBlockPrivateAddresses(user string) bool { | ||
panic("implement me") | ||
} | ||
|
||
func (m mockAlertManagerLimits) EmailNotificationRateLimit(_ string) rate.Limit { | ||
return m.emailNotificationRateLimit | ||
} | ||
|
||
func (m mockAlertManagerLimits) EmailNotificationBurst(_ string) int { | ||
return m.emailNotificationBurst | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package alertmanager | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"time" | ||
|
||
"github.com/prometheus/alertmanager/notify" | ||
"github.com/prometheus/alertmanager/types" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"go.uber.org/atomic" | ||
"golang.org/x/time/rate" | ||
) | ||
|
||
type rateLimits interface { | ||
RateLimit() rate.Limit | ||
Burst() int | ||
} | ||
|
||
type rateLimitedNotifier struct { | ||
upstream notify.Notifier | ||
counter prometheus.Counter | ||
|
||
limiter *rate.Limiter | ||
limits rateLimits | ||
|
||
recheckInterval time.Duration | ||
recheckAt atomic.Int64 // unix nanoseconds timestamp | ||
} | ||
|
||
func newRateLimitedNotifier(upstream notify.Notifier, limits rateLimits, recheckInterval time.Duration, counter prometheus.Counter) *rateLimitedNotifier { | ||
return &rateLimitedNotifier{ | ||
upstream: upstream, | ||
counter: counter, | ||
limits: limits, | ||
limiter: rate.NewLimiter(limits.RateLimit(), limits.Burst()), | ||
recheckInterval: recheckInterval, | ||
} | ||
} | ||
|
||
var errRateLimited = errors.New("failed to notify due to rate limits") | ||
|
||
func (r *rateLimitedNotifier) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) { | ||
now := time.Now() | ||
if now.UnixNano() >= r.recheckAt.Load() { | ||
if limit := r.limits.RateLimit(); r.limiter.Limit() != limit { | ||
r.limiter.SetLimitAt(now, limit) | ||
} | ||
|
||
if burst := r.limits.Burst(); r.limiter.Burst() != burst { | ||
r.limiter.SetBurstAt(now, burst) | ||
} | ||
|
||
r.recheckAt.Store(now.UnixNano() + r.recheckInterval.Nanoseconds()) | ||
} | ||
|
||
// This counts as single notification, no matter how many alerts there are in it. | ||
if !r.limiter.AllowN(now, 1) { | ||
r.counter.Inc() | ||
// Don't retry this notification later. | ||
return false, errRateLimited | ||
} | ||
|
||
return r.upstream.Notify(ctx, alerts...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should cancel the limiter reservation if the upstream returns error. WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I'm not sure. These limits will be already quite high. If user is hitting them, it's likely that user is doing something wrong or bad. Imagine someone abusing alertmanager notifications for sending lot of requests to a website, which starts crashing and return 500. This can look like failed notification – if we cancel the reservation, it allows bad actor to keep sending more requests. (This PR is only dealing with emails, but we will reuse this for other integrations). WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see your point and your example makes sense. I'm also thinking about the opposite case: a legit receiver backend server is down, retries hit the rate limit even if no notification has been successfully delivered. However, since we can't distinguish it, it's probably safer to keep the current logic as you suggest. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @pstibrany we should count against the limit whether success or fail, because opens some possibilities for abuse/broken things to make it through if done the other way. |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to have separate configuration for each notification type? I think there are some advantages to that in flexibility, but might make the config a bit complicated if this is extended to all the receivers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that was my plan. Do you suggest to use single rate-limit configuration for all notification types? We can also have one generic rate-limit config for all integrations, with per-integration-type overrides in case they are needed. WDYT?
Would you also suggest to use single rate-limiter shared across all notifiers? (I guess not, because eg. too many webhook notifications could stop email notifications).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah... we cannot easily distinguish between default values and missing values, so this would be difficult.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use pointer values in the config? If pointer is nil then it has not been set. I haven't checked if it's doable, just asking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is doable, but could lead to confusing rulers when trying to setup overrides correctly.
Eg.:
global limits:
user A:
Now when computing rate limit for email integration for "user A", we can either use his shared limits (c), or global email rate limits (b), but it's unclear what is a better option, and no matter what we choose, we will confuse some people.
I would leave that to a separate discussion and PR for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree on this. @ranton256 What do you think? Some feedback on this would be great. Thanks!