Skip to content

Commit f686ff3

Browse files
author
Julien Pivotto
authored
Merge pull request #2569 from pstibrany/store-callback
Added possibility to pass callback to `mem.Alerts`, useful for implementing limits on alerts.
2 parents 243accb + 15ea220 commit f686ff3

File tree

7 files changed

+151
-14
lines changed

7 files changed

+151
-14
lines changed

cmd/alertmanager/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ func run() int {
321321
go peer.Settle(ctx, *gossipInterval*10)
322322
}
323323

324-
alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, logger)
324+
alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger)
325325
if err != nil {
326326
level.Error(logger).Log("err", err)
327327
return 1

dispatch/dispatch_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ route:
365365
logger := log.NewNopLogger()
366366
route := NewRoute(conf.Route, nil)
367367
marker := types.NewMarker(prometheus.NewRegistry())
368-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger)
368+
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger)
369369
if err != nil {
370370
t.Fatal(err)
371371
}
@@ -527,7 +527,7 @@ func newAlert(labels model.LabelSet) *types.Alert {
527527
func TestDispatcherRace(t *testing.T) {
528528
logger := log.NewNopLogger()
529529
marker := types.NewMarker(prometheus.NewRegistry())
530-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger)
530+
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger)
531531
if err != nil {
532532
t.Fatal(err)
533533
}
@@ -544,7 +544,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
544544

545545
logger := log.NewNopLogger()
546546
marker := types.NewMarker(prometheus.NewRegistry())
547-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger)
547+
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger)
548548
if err != nil {
549549
t.Fatal(err)
550550
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ require (
3030
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546
3131
github.com/stretchr/testify v1.7.0
3232
github.com/xlab/treeprint v1.1.0
33+
go.uber.org/atomic v1.5.0
3334
golang.org/x/net v0.0.0-20210421230115-4e50805a0758
3435
golang.org/x/tools v0.1.0
3536
gopkg.in/alecthomas/kingpin.v2 v2.2.6

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,7 @@ go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
533533
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
534534
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
535535
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
536+
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
536537
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
537538
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
538539
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
@@ -560,6 +561,7 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk
560561
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
561562
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
562563
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
564+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
563565
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
564566
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
565567
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=

provider/mem/mem.go

+40-2
Original file line numberDiff line numberDiff line change
@@ -38,30 +38,52 @@ type Alerts struct {
3838
listeners map[int]listeningAlerts
3939
next int
4040

41+
callback AlertStoreCallback
42+
4143
logger log.Logger
4244
}
4345

46+
type AlertStoreCallback interface {
47+
// PreStore is called before alert is stored into the store. If this method returns error,
48+
// alert is not stored.
49+
// Existing flag indicates whether alert has existed before (and is only updated) or not.
50+
// If alert has existed before, then alert passed to PreStore is result of merging existing alert with new alert.
51+
PreStore(alert *types.Alert, existing bool) error
52+
53+
// PostStore is called after alert has been put into store.
54+
PostStore(alert *types.Alert, existing bool)
55+
56+
// PostDelete is called after alert has been removed from the store due to alert garbage collection.
57+
PostDelete(alert *types.Alert)
58+
}
59+
4460
type listeningAlerts struct {
4561
alerts chan *types.Alert
4662
done chan struct{}
4763
}
4864

4965
// NewAlerts returns a new alert provider.
50-
func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l log.Logger) (*Alerts, error) {
66+
func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, alertCallback AlertStoreCallback, l log.Logger) (*Alerts, error) {
67+
if alertCallback == nil {
68+
alertCallback = noopCallback{}
69+
}
70+
5171
ctx, cancel := context.WithCancel(ctx)
5272
a := &Alerts{
5373
alerts: store.NewAlerts(),
5474
cancel: cancel,
5575
listeners: map[int]listeningAlerts{},
5676
next: 0,
5777
logger: log.With(l, "component", "provider"),
78+
callback: alertCallback,
5879
}
5980
a.alerts.SetGCCallback(func(alerts []*types.Alert) {
6081
for _, alert := range alerts {
6182
// As we don't persist alerts, we no longer consider them after
6283
// they are resolved. Alerts waiting for resolved notifications are
6384
// held in memory in aggregation groups redundantly.
6485
m.Delete(alert.Fingerprint())
86+
a.callback.PostDelete(alert)
6587
}
6688

6789
a.mtx.Lock()
@@ -148,25 +170,35 @@ func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
148170

149171
// Put adds the given alert to the set.
150172
func (a *Alerts) Put(alerts ...*types.Alert) error {
151-
152173
for _, alert := range alerts {
153174
fp := alert.Fingerprint()
154175

176+
existing := false
177+
155178
// Check that there's an alert existing within the store before
156179
// trying to merge.
157180
if old, err := a.alerts.Get(fp); err == nil {
181+
existing = true
182+
158183
// Merge alerts if there is an overlap in activity range.
159184
if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
160185
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
161186
alert = old.Merge(alert)
162187
}
163188
}
164189

190+
if err := a.callback.PreStore(alert, existing); err != nil {
191+
level.Error(a.logger).Log("msg", "pre-store callback returned error on set alert", "err", err)
192+
continue
193+
}
194+
165195
if err := a.alerts.Set(alert); err != nil {
166196
level.Error(a.logger).Log("msg", "error on set alert", "err", err)
167197
continue
168198
}
169199

200+
a.callback.PostStore(alert, existing)
201+
170202
a.mtx.Lock()
171203
for _, l := range a.listeners {
172204
select {
@@ -179,3 +211,9 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
179211

180212
return nil
181213
}
214+
215+
type noopCallback struct{}
216+
217+
func (n noopCallback) PreStore(_ *types.Alert, _ bool) error { return nil }
218+
func (n noopCallback) PostStore(_ *types.Alert, _ bool) {}
219+
func (n noopCallback) PostDelete(_ *types.Alert) {}

provider/mem/mem_test.go

+103-7
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import (
2525

2626
"github.com/go-kit/kit/log"
2727
"github.com/kylelemons/godebug/pretty"
28-
"github.com/prometheus/alertmanager/store"
29-
"github.com/prometheus/alertmanager/types"
3028
"github.com/prometheus/client_golang/prometheus"
3129
"github.com/prometheus/common/model"
3230
"github.com/stretchr/testify/require"
31+
"go.uber.org/atomic"
32+
33+
"github.com/prometheus/alertmanager/store"
34+
"github.com/prometheus/alertmanager/types"
3335
)
3436

3537
var (
@@ -85,7 +87,7 @@ func init() {
8587
// a listener can not unsubscribe as the lock is hold by `alerts.Lock`.
8688
func TestAlertsSubscribePutStarvation(t *testing.T) {
8789
marker := types.NewMarker(prometheus.NewRegistry())
88-
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
90+
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger())
8991
if err != nil {
9092
t.Fatal(err)
9193
}
@@ -136,7 +138,7 @@ func TestAlertsSubscribePutStarvation(t *testing.T) {
136138

137139
func TestAlertsPut(t *testing.T) {
138140
marker := types.NewMarker(prometheus.NewRegistry())
139-
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
141+
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger())
140142
if err != nil {
141143
t.Fatal(err)
142144
}
@@ -164,7 +166,7 @@ func TestAlertsSubscribe(t *testing.T) {
164166

165167
ctx, cancel := context.WithCancel(context.Background())
166168
defer cancel()
167-
alerts, err := NewAlerts(ctx, marker, 30*time.Minute, log.NewNopLogger())
169+
alerts, err := NewAlerts(ctx, marker, 30*time.Minute, noopCallback{}, log.NewNopLogger())
168170
if err != nil {
169171
t.Fatal(err)
170172
}
@@ -241,7 +243,7 @@ func TestAlertsSubscribe(t *testing.T) {
241243

242244
func TestAlertsGetPending(t *testing.T) {
243245
marker := types.NewMarker(prometheus.NewRegistry())
244-
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
246+
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger())
245247
if err != nil {
246248
t.Fatal(err)
247249
}
@@ -284,7 +286,7 @@ func TestAlertsGetPending(t *testing.T) {
284286

285287
func TestAlertsGC(t *testing.T) {
286288
marker := types.NewMarker(prometheus.NewRegistry())
287-
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, log.NewNopLogger())
289+
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, noopCallback{}, log.NewNopLogger())
288290
if err != nil {
289291
t.Fatal(err)
290292
}
@@ -316,6 +318,71 @@ func TestAlertsGC(t *testing.T) {
316318
}
317319
}
318320

321+
func TestAlertsStoreCallback(t *testing.T) {
322+
cb := &limitCountCallback{limit: 3}
323+
324+
marker := types.NewMarker(prometheus.NewRegistry())
325+
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, cb, log.NewNopLogger())
326+
if err != nil {
327+
t.Fatal(err)
328+
}
329+
330+
err = alerts.Put(alert1, alert2, alert3)
331+
if err != nil {
332+
t.Fatal(err)
333+
}
334+
if num := cb.alerts.Load(); num != 3 {
335+
t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 3, num)
336+
}
337+
338+
alert1Mod := *alert1
339+
alert1Mod.Annotations = model.LabelSet{"foo": "bar", "new": "test"} // Update annotations for alert1
340+
341+
alert4 := &types.Alert{
342+
Alert: model.Alert{
343+
Labels: model.LabelSet{"bar4": "foo4"},
344+
Annotations: model.LabelSet{"foo4": "bar4"},
345+
StartsAt: t0,
346+
EndsAt: t1,
347+
GeneratorURL: "http://example.com/prometheus",
348+
},
349+
UpdatedAt: t0,
350+
Timeout: false,
351+
}
352+
353+
err = alerts.Put(&alert1Mod, alert4)
354+
// Verify that we failed to put new alert into store (not reported via error, only checked using Load)
355+
if err != nil {
356+
t.Fatalf("unexpected error %v", err)
357+
}
358+
359+
if num := cb.alerts.Load(); num != 3 {
360+
t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 3, num)
361+
}
362+
363+
// But we still managed to update alert1, since callback doesn't report error when updating existing alert.
364+
a, err := alerts.Get(alert1.Fingerprint())
365+
if err != nil {
366+
t.Fatal(err)
367+
}
368+
if !alertsEqual(a, &alert1Mod) {
369+
t.Errorf("Unexpected alert")
370+
t.Fatalf(pretty.Compare(a, &alert1Mod))
371+
}
372+
373+
// Now wait until existing alerts are GC-ed, and make sure that callback was called.
374+
time.Sleep(300 * time.Millisecond)
375+
376+
if num := cb.alerts.Load(); num != 0 {
377+
t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 0, num)
378+
}
379+
380+
err = alerts.Put(alert4)
381+
if err != nil {
382+
t.Fatal(err)
383+
}
384+
}
385+
319386
func alertsEqual(a1, a2 *types.Alert) bool {
320387
if a1 == nil || a2 == nil {
321388
return false
@@ -340,3 +407,32 @@ func alertsEqual(a1, a2 *types.Alert) bool {
340407
}
341408
return a1.Timeout == a2.Timeout
342409
}
410+
411+
type limitCountCallback struct {
412+
alerts atomic.Int32
413+
limit int
414+
}
415+
416+
var errTooManyAlerts = fmt.Errorf("too many alerts")
417+
418+
func (l *limitCountCallback) PreStore(_ *types.Alert, existing bool) error {
419+
if existing {
420+
return nil
421+
}
422+
423+
if int(l.alerts.Load())+1 > l.limit {
424+
return errTooManyAlerts
425+
}
426+
427+
return nil
428+
}
429+
430+
func (l *limitCountCallback) PostStore(_ *types.Alert, existing bool) {
431+
if !existing {
432+
l.alerts.Inc()
433+
}
434+
}
435+
436+
func (l *limitCountCallback) PostDelete(_ *types.Alert) {
437+
l.alerts.Dec()
438+
}

provider/provider.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,6 @@ type Alerts interface {
8383
GetPending() AlertIterator
8484
// Get returns the alert for a given fingerprint.
8585
Get(model.Fingerprint) (*types.Alert, error)
86-
// Put adds the given alert to the set.
86+
// Put adds the given set of alerts to the set.
8787
Put(...*types.Alert) error
8888
}

0 commit comments

Comments
 (0)