Skip to content

Commit 7304ba0

Browse files
committed
Delete alertmanager state objects from remote storage on user deletion.
The same pattern as for cleaning up local state is used - checking for state objects to clean up whenever the users are synced, and deleting them if the user has no configuration. This means that we perform an additional listing operation as opposed to doing it on demand, when the user configuration is deleted. This does mean that the deletion will be more robust against e.g. the process dying before cleaning up, or transient storage errors causing deletion to fail. Note that a different mechanism is used to check if a remote user state should be deleted, compared to for local state, because every user may not be currently active in every alertmanager instance due to sharding. Therefore, we only delete state for users which have also have no configuration. The user listing is re-used from the user sync. Signed-off-by: Steve Simpson <[email protected]>
1 parent 0595579 commit 7304ba0

File tree

8 files changed

+162
-5
lines changed

8 files changed

+162
-5
lines changed

pkg/alertmanager/alertstore/bucketclient/bucket_client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"io/ioutil"
7+
"strings"
78
"sync"
89

910
"github.com/go-kit/kit/log"
@@ -123,6 +124,18 @@ func (s *BucketAlertStore) DeleteAlertConfig(ctx context.Context, userID string)
123124
return err
124125
}
125126

127+
// ListUsersWithState implements alertstore.AlertStore.
128+
func (s *BucketAlertStore) ListUsersWithState(ctx context.Context) ([]string, error) {
129+
var userIDs []string
130+
131+
err := s.amBucket.Iter(ctx, "", func(key string) error {
132+
userIDs = append(userIDs, strings.TrimRight(key, "/"))
133+
return nil
134+
})
135+
136+
return userIDs, err
137+
}
138+
126139
// GetFullState implements alertstore.AlertStore.
127140
func (s *BucketAlertStore) GetFullState(ctx context.Context, userID string) (alertspb.FullStateDesc, error) {
128141
bkt := s.getAlertmanagerUserBucket(userID)

pkg/alertmanager/alertstore/configdb/store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ func (c *Store) DeleteAlertConfig(ctx context.Context, user string) error {
9393
return errReadOnly
9494
}
9595

96+
// ListUsersWithState implements alertstore.AlertStore.
97+
func (c *Store) ListUsersWithState(ctx context.Context) ([]string, error) {
98+
return nil, errState
99+
}
100+
96101
// GetFullState implements alertstore.AlertStore.
97102
func (c *Store) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
98103
return alertspb.FullStateDesc{}, errState

pkg/alertmanager/alertstore/local/store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ func (f *Store) DeleteAlertConfig(_ context.Context, user string) error {
101101
return errReadOnly
102102
}
103103

104+
// ListUsersWithState implements alertstore.AlertStore.
105+
func (f *Store) ListUsersWithState(ctx context.Context) ([]string, error) {
106+
return nil, errState
107+
}
108+
104109
// GetFullState implements alertstore.AlertStore.
105110
func (f *Store) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
106111
return alertspb.FullStateDesc{}, errState

pkg/alertmanager/alertstore/objectclient/store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ func (a *AlertStore) DeleteAlertConfig(ctx context.Context, user string) error {
142142
return err
143143
}
144144

145+
// ListUsersWithState implements alertstore.AlertStore.
146+
func (a *AlertStore) ListUsersWithState(ctx context.Context) ([]string, error) {
147+
return nil, errState
148+
}
149+
145150
// GetFullState implements alertstore.AlertStore.
146151
func (a *AlertStore) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
147152
return alertspb.FullStateDesc{}, errState

pkg/alertmanager/alertstore/store.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ type AlertStore interface {
4040
// If configuration for the user doesn't exist, no error is reported.
4141
DeleteAlertConfig(ctx context.Context, user string) error
4242

43+
// ListUsersWithState returns the list of users which have had state written.
44+
ListUsersWithState(ctx context.Context) ([]string, error)
45+
4346
// GetFullState loads and returns the alertmanager state for the given user.
4447
GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error)
4548

pkg/alertmanager/alertstore/store_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
220220

221221
_, err = store.GetFullState(ctx, "user-2")
222222
assert.Equal(t, alertspb.ErrNotFound, err)
223+
224+
users, err := store.ListUsersWithState(ctx)
225+
assert.NoError(t, err)
226+
assert.ElementsMatch(t, []string{}, users)
223227
}
224228

225229
// The storage contains users.
@@ -244,6 +248,10 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
244248
exists, err = bucket.Exists(ctx, "alertmanager/user-2/fullstate")
245249
require.NoError(t, err)
246250
assert.True(t, exists)
251+
252+
users, err := store.ListUsersWithState(ctx)
253+
assert.NoError(t, err)
254+
assert.ElementsMatch(t, []string{"user-1", "user-2"}, users)
247255
}
248256

249257
// The storage has had user-1 deleted.
@@ -258,6 +266,10 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
258266
require.NoError(t, err)
259267
assert.Equal(t, state2, res)
260268

269+
users, err := store.ListUsersWithState(ctx)
270+
assert.NoError(t, err)
271+
assert.ElementsMatch(t, []string{"user-2"}, users)
272+
261273
// Delete again (should be idempotent).
262274
require.NoError(t, store.DeleteFullState(ctx, "user-1"))
263275
}

pkg/alertmanager/multitenant.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,7 @@ func (am *MultitenantAlertmanager) loadAndSyncConfigs(ctx context.Context, syncR
663663
level.Info(am.logger).Log("msg", "synchronizing alertmanager configs for users")
664664
am.syncTotal.WithLabelValues(syncReason).Inc()
665665

666-
cfgs, err := am.loadAlertmanagerConfigs(ctx)
666+
allUsers, cfgs, err := am.loadAlertmanagerConfigs(ctx)
667667
if err != nil {
668668
am.syncFailures.WithLabelValues(syncReason).Inc()
669669
return err
@@ -672,6 +672,13 @@ func (am *MultitenantAlertmanager) loadAndSyncConfigs(ctx context.Context, syncR
672672
am.syncConfigs(cfgs)
673673
am.deleteUnusedLocalUserState()
674674

675+
// Currently, remote state persistence is only used when sharding is enabled.
676+
if am.cfg.ShardingEnabled {
677+
// Note when cleaning up remote state, remember that the user may not necessarily be configured
678+
// in this instance. Therefore, pass the list of _all_ configured users to filter by.
679+
am.deleteUnusedRemoteUserState(ctx, allUsers)
680+
}
681+
675682
return nil
676683
}
677684

@@ -697,11 +704,11 @@ func (am *MultitenantAlertmanager) stopping(_ error) error {
697704
}
698705

699706
// loadAlertmanagerConfigs Loads (and filters) the alertmanagers configuration from object storage, taking into consideration the sharding strategy.
700-
func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context) (map[string]alertspb.AlertConfigDesc, error) {
707+
func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context) ([]string, map[string]alertspb.AlertConfigDesc, error) {
701708
// Find all users with an alertmanager config.
702709
userIDs, err := am.store.ListAllUsers(ctx)
703710
if err != nil {
704-
return nil, errors.Wrap(err, "failed to list users with alertmanager configuration")
711+
return nil, nil, errors.Wrap(err, "failed to list users with alertmanager configuration")
705712
}
706713
numUsersDiscovered := len(userIDs)
707714

@@ -719,12 +726,12 @@ func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context)
719726
// Load the configs for the owned users.
720727
configs, err := am.store.GetAlertConfigs(ctx, userIDs)
721728
if err != nil {
722-
return nil, errors.Wrapf(err, "failed to load alertmanager configurations for owned users")
729+
return nil, nil, errors.Wrapf(err, "failed to load alertmanager configurations for owned users")
723730
}
724731

725732
am.tenantsDiscovered.Set(float64(numUsersDiscovered))
726733
am.tenantsOwned.Set(float64(numUsersOwned))
727-
return configs, nil
734+
return userIDs, configs, nil
728735
}
729736

730737
func (am *MultitenantAlertmanager) isUserOwned(userID string) bool {
@@ -1147,6 +1154,35 @@ func (am *MultitenantAlertmanager) UpdateState(ctx context.Context, part *cluste
11471154
return &alertmanagerpb.UpdateStateResponse{Status: alertmanagerpb.OK}, nil
11481155
}
11491156

1157+
// deleteUnusedRemoteUserState deletes state objects in remote storage for users that are no longer configured.
1158+
1159+
func (am *MultitenantAlertmanager) deleteUnusedRemoteUserState(ctx context.Context, allUsers []string) {
1160+
1161+
users := make(map[string]struct{}, len(allUsers))
1162+
for _, userID := range allUsers {
1163+
users[userID] = struct{}{}
1164+
}
1165+
1166+
usersWithState, err := am.store.ListUsersWithState(ctx)
1167+
if err != nil {
1168+
level.Warn(am.logger).Log("msg", "failed to list users with state", "err", err)
1169+
return
1170+
}
1171+
1172+
for _, userID := range usersWithState {
1173+
if _, ok := users[userID]; ok {
1174+
continue
1175+
}
1176+
1177+
err := am.store.DeleteFullState(ctx, userID)
1178+
if err != nil {
1179+
level.Warn(am.logger).Log("msg", "failed to delete remote state for user", "user", userID, "err", err)
1180+
} else {
1181+
level.Info(am.logger).Log("msg", "deleted remote state for user", "user", userID)
1182+
}
1183+
}
1184+
}
1185+
11501186
// deleteUnusedLocalUserState deletes local files for users that we no longer need.
11511187
func (am *MultitenantAlertmanager) deleteUnusedLocalUserState() {
11521188
userDirs := am.getPerUserDirectories()

pkg/alertmanager/multitenant_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,82 @@ func TestMultitenantAlertmanager_deleteUnusedLocalUserState(t *testing.T) {
571571
require.NotZero(t, dirs[user2]) // has config, files survived
572572
}
573573

574+
func TestMultitenantAlertmanager_deleteUnusedRemoteUserState(t *testing.T) {
575+
ctx := context.Background()
576+
577+
const (
578+
user1 = "user1"
579+
user2 = "user2"
580+
)
581+
582+
alertStore := prepareInMemoryAlertStore()
583+
ringStore := consul.NewInMemoryClient(ring.GetCodec())
584+
reg := prometheus.NewPedanticRegistry()
585+
cfg := mockAlertmanagerConfig(t)
586+
587+
cfg.ShardingRing.ReplicationFactor = 1
588+
cfg.ShardingRing.InstanceID = "instance"
589+
cfg.ShardingRing.InstanceAddr = "127.0.0.1"
590+
cfg.ShardingEnabled = true
591+
592+
// Increase state write interval so that state gets written sooner, making test faster.
593+
cfg.Persister.Interval = 500 * time.Millisecond
594+
595+
am, err := createMultitenantAlertmanager(cfg, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), reg)
596+
require.NoError(t, err)
597+
t.Cleanup(func() {
598+
require.NoError(t, services.StopAndAwaitTerminated(ctx, am))
599+
})
600+
require.NoError(t, services.StartAndAwaitRunning(ctx, am))
601+
602+
// Configure the users and wait for the state persister to write some state for both.
603+
{
604+
require.NoError(t, alertStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
605+
User: user1,
606+
RawConfig: simpleConfigOne,
607+
Templates: []*alertspb.TemplateDesc{},
608+
}))
609+
require.NoError(t, alertStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
610+
User: user2,
611+
RawConfig: simpleConfigOne,
612+
Templates: []*alertspb.TemplateDesc{},
613+
}))
614+
615+
err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
616+
require.NoError(t, err)
617+
618+
require.Eventually(t, func() bool {
619+
_, err1 := alertStore.GetFullState(context.Background(), user1)
620+
_, err2 := alertStore.GetFullState(context.Background(), user2)
621+
return err1 == nil && err2 == nil
622+
}, 5*time.Second, 100*time.Millisecond, "timed out waiting for state to be persisted")
623+
}
624+
625+
// Perform another sync to trigger cleanup; this should have no effect.
626+
{
627+
err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
628+
require.NoError(t, err)
629+
630+
_, err = alertStore.GetFullState(context.Background(), user1)
631+
require.NoError(t, err)
632+
_, err = alertStore.GetFullState(context.Background(), user2)
633+
require.NoError(t, err)
634+
}
635+
636+
// Delete one configuration and trigger cleanup; state for only that user should be deleted.
637+
{
638+
require.NoError(t, alertStore.DeleteAlertConfig(ctx, user1))
639+
640+
err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
641+
require.NoError(t, err)
642+
643+
_, err = alertStore.GetFullState(context.Background(), user1)
644+
require.Equal(t, alertspb.ErrNotFound, err)
645+
_, err = alertStore.GetFullState(context.Background(), user2)
646+
require.NoError(t, err)
647+
}
648+
}
649+
574650
func createFile(t *testing.T, path string) string {
575651
dir := filepath.Dir(path)
576652
require.NoError(t, os.MkdirAll(dir, 0777))
@@ -834,6 +910,7 @@ func TestMultitenantAlertmanager_InitialSyncWithSharding(t *testing.T) {
834910
require.True(t, am.ringLifecycler.IsRegistered())
835911
require.Equal(t, ring.JOINING.String(), am.ringLifecycler.GetState().String())
836912
})
913+
bkt.MockIter("alertmanager/", nil, nil)
837914

838915
// Once successfully started, the instance should be ACTIVE in the ring.
839916
require.NoError(t, services.StartAndAwaitRunning(ctx, am))
@@ -1184,6 +1261,7 @@ func TestMultitenantAlertmanager_InitialSyncFailureWithSharding(t *testing.T) {
11841261
// Mock the store to fail listing configs.
11851262
bkt := &bucket.ClientMock{}
11861263
bkt.MockIter("alerts/", nil, errors.New("failed to list alerts"))
1264+
bkt.MockIter("alertmanager/", nil, nil)
11871265
store := bucketclient.NewBucketAlertStore(bkt, nil, log.NewNopLogger())
11881266

11891267
am, err := createMultitenantAlertmanager(amConfig, nil, nil, store, ringStore, nil, log.NewNopLogger(), nil)

0 commit comments

Comments
 (0)