Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete alertmanager state objects from remote storage on user deletion. #4167

Merged
merged 3 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/alertmanager/alertstore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"io/ioutil"
"strings"
"sync"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -123,6 +124,18 @@ func (s *BucketAlertStore) DeleteAlertConfig(ctx context.Context, userID string)
return err
}

// ListUsersWithFullState implements alertstore.AlertStore.
func (s *BucketAlertStore) ListUsersWithFullState(ctx context.Context) ([]string, error) {
var userIDs []string

err := s.amBucket.Iter(ctx, "", func(key string) error {
userIDs = append(userIDs, strings.TrimRight(key, "/"))
return nil
})

return userIDs, err
}

// GetFullState implements alertstore.AlertStore.
func (s *BucketAlertStore) GetFullState(ctx context.Context, userID string) (alertspb.FullStateDesc, error) {
bkt := s.getAlertmanagerUserBucket(userID)
Expand Down
5 changes: 5 additions & 0 deletions pkg/alertmanager/alertstore/configdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func (c *Store) DeleteAlertConfig(ctx context.Context, user string) error {
return errReadOnly
}

// ListUsersWithFullState implements alertstore.AlertStore.
func (c *Store) ListUsersWithFullState(ctx context.Context) ([]string, error) {
return nil, errState
}

// GetFullState implements alertstore.AlertStore.
func (c *Store) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
return alertspb.FullStateDesc{}, errState
Expand Down
5 changes: 5 additions & 0 deletions pkg/alertmanager/alertstore/local/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ func (f *Store) DeleteAlertConfig(_ context.Context, user string) error {
return errReadOnly
}

// ListUsersWithFullState implements alertstore.AlertStore.
func (f *Store) ListUsersWithFullState(ctx context.Context) ([]string, error) {
return nil, errState
}

// GetFullState implements alertstore.AlertStore.
func (f *Store) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
return alertspb.FullStateDesc{}, errState
Expand Down
5 changes: 5 additions & 0 deletions pkg/alertmanager/alertstore/objectclient/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (a *AlertStore) DeleteAlertConfig(ctx context.Context, user string) error {
return err
}

// ListUsersWithFullState implements alertstore.AlertStore.
func (a *AlertStore) ListUsersWithFullState(ctx context.Context) ([]string, error) {
return nil, errState
}

// GetFullState implements alertstore.AlertStore.
func (a *AlertStore) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
return alertspb.FullStateDesc{}, errState
Expand Down
3 changes: 3 additions & 0 deletions pkg/alertmanager/alertstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type AlertStore interface {
// If configuration for the user doesn't exist, no error is reported.
DeleteAlertConfig(ctx context.Context, user string) error

// ListUsersWithFullState returns the list of users which have had state written.
ListUsersWithFullState(ctx context.Context) ([]string, error)

// GetFullState loads and returns the alertmanager state for the given user.
GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error)

Expand Down
12 changes: 12 additions & 0 deletions pkg/alertmanager/alertstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {

_, err = store.GetFullState(ctx, "user-2")
assert.Equal(t, alertspb.ErrNotFound, err)

users, err := store.ListUsersWithFullState(ctx)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{}, users)
}

// The storage contains users.
Expand All @@ -244,6 +248,10 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
exists, err = bucket.Exists(ctx, "alertmanager/user-2/fullstate")
require.NoError(t, err)
assert.True(t, exists)

users, err := store.ListUsersWithFullState(ctx)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{"user-1", "user-2"}, users)
}

// The storage has had user-1 deleted.
Expand All @@ -258,6 +266,10 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, state2, res)

users, err := store.ListUsersWithFullState(ctx)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{"user-2"}, users)

// Delete again (should be idempotent).
require.NoError(t, store.DeleteFullState(ctx, "user-1"))
}
Expand Down
67 changes: 51 additions & 16 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func (am *MultitenantAlertmanager) loadAndSyncConfigs(ctx context.Context, syncR
level.Info(am.logger).Log("msg", "synchronizing alertmanager configs for users")
am.syncTotal.WithLabelValues(syncReason).Inc()

cfgs, err := am.loadAlertmanagerConfigs(ctx)
allUsers, cfgs, err := am.loadAlertmanagerConfigs(ctx)
if err != nil {
am.syncFailures.WithLabelValues(syncReason).Inc()
return err
Expand All @@ -672,6 +672,13 @@ func (am *MultitenantAlertmanager) loadAndSyncConfigs(ctx context.Context, syncR
am.syncConfigs(cfgs)
am.deleteUnusedLocalUserState()

// Currently, remote state persistence is only used when sharding is enabled.
if am.cfg.ShardingEnabled {
// Note when cleaning up remote state, remember that the user may not necessarily be configured
// in this instance. Therefore, pass the list of _all_ configured users to filter by.
am.deleteUnusedRemoteUserState(ctx, allUsers)
}

return nil
}

Expand All @@ -696,35 +703,35 @@ func (am *MultitenantAlertmanager) stopping(_ error) error {
return nil
}

// loadAlertmanagerConfigs Loads (and filters) the alertmanagers configuration from object storage, taking into consideration the sharding strategy.
func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context) (map[string]alertspb.AlertConfigDesc, error) {
// loadAlertmanagerConfigs Loads (and filters) the alertmanagers configuration from object storage, taking into consideration the sharding strategy. Returns:
// - The list of discovered users (all users with a configuration in storage)
// - The configurations of users owned by this instance.
func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context) ([]string, map[string]alertspb.AlertConfigDesc, error) {
// Find all users with an alertmanager config.
userIDs, err := am.store.ListAllUsers(ctx)
allUserIDs, err := am.store.ListAllUsers(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to list users with alertmanager configuration")
return nil, nil, errors.Wrap(err, "failed to list users with alertmanager configuration")
}
numUsersDiscovered := len(userIDs)
numUsersDiscovered := len(allUserIDs)
ownedUserIDs := make([]string, 0, len(allUserIDs))

// Filter out users not owned by this shard.
for i := 0; i < len(userIDs); {
if !am.isUserOwned(userIDs[i]) {
userIDs = append(userIDs[:i], userIDs[i+1:]...)
continue
for _, userID := range allUserIDs {
if am.isUserOwned(userID) {
ownedUserIDs = append(ownedUserIDs, userID)
}

i++
}
numUsersOwned := len(userIDs)
numUsersOwned := len(ownedUserIDs)

// Load the configs for the owned users.
configs, err := am.store.GetAlertConfigs(ctx, userIDs)
configs, err := am.store.GetAlertConfigs(ctx, ownedUserIDs)
if err != nil {
return nil, errors.Wrapf(err, "failed to load alertmanager configurations for owned users")
return nil, nil, errors.Wrapf(err, "failed to load alertmanager configurations for owned users")
}

am.tenantsDiscovered.Set(float64(numUsersDiscovered))
am.tenantsOwned.Set(float64(numUsersOwned))
return configs, nil
return allUserIDs, configs, nil
}

func (am *MultitenantAlertmanager) isUserOwned(userID string) bool {
Expand Down Expand Up @@ -1147,6 +1154,34 @@ func (am *MultitenantAlertmanager) UpdateState(ctx context.Context, part *cluste
return &alertmanagerpb.UpdateStateResponse{Status: alertmanagerpb.OK}, nil
}

// deleteUnusedRemoteUserState deletes state objects in remote storage for users that are no longer configured.
func (am *MultitenantAlertmanager) deleteUnusedRemoteUserState(ctx context.Context, allUsers []string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[food for thought] This will be executed on every alertmanager replica, so the number of ListUsersWithState() operations will linearly scale up with the number of replicas. I'm wondering if this cleanup is enough if done by the alertmanager replicas holding an hardcoded token (eg. token 0).

@pstibrany @stevesg What's your take on this? I'm dubious, so asking.

Copy link
Contributor Author

@stevesg stevesg May 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only needs to be done by one/any instance, in fact it's a completely independent process. I don't have a good answer as to whether the saving is worth the added complexity though. One bucket iteration per PollInterval per replica doesn't seem excessive. Reducing the frequency of the cleanup would also have a good effect (e.g. interval of 4x PollInterval).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wait for @pstibrany feedback. We need then 2nd reviewer anyway, so I wouldn't be able to merge the PR right now.

Copy link
Contributor

@pstibrany pstibrany May 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinion on whether to do it by single instance or all... doing it by all is certainly easier. To reduce the API operations, we can do it less often then on every sync, and add some jitter to the process.


users := make(map[string]struct{}, len(allUsers))
for _, userID := range allUsers {
users[userID] = struct{}{}
}

usersWithState, err := am.store.ListUsersWithFullState(ctx)
if err != nil {
level.Warn(am.logger).Log("msg", "failed to list users with state", "err", err)
return
}

for _, userID := range usersWithState {
if _, ok := users[userID]; ok {
continue
}

err := am.store.DeleteFullState(ctx, userID)
if err != nil {
level.Warn(am.logger).Log("msg", "failed to delete remote state for user", "user", userID, "err", err)
} else {
level.Info(am.logger).Log("msg", "deleted remote state for user", "user", userID)
}
}
}

// deleteUnusedLocalUserState deletes local files for users that we no longer need.
func (am *MultitenantAlertmanager) deleteUnusedLocalUserState() {
userDirs := am.getPerUserDirectories()
Expand Down
95 changes: 95 additions & 0 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,99 @@ func TestMultitenantAlertmanager_deleteUnusedLocalUserState(t *testing.T) {
require.NotZero(t, dirs[user2]) // has config, files survived
}

func TestMultitenantAlertmanager_deleteUnusedRemoteUserState(t *testing.T) {
ctx := context.Background()

const (
user1 = "user1"
user2 = "user2"
)

alertStore := prepareInMemoryAlertStore()
ringStore := consul.NewInMemoryClient(ring.GetCodec())

createInstance := func(i int) *MultitenantAlertmanager {
reg := prometheus.NewPedanticRegistry()
cfg := mockAlertmanagerConfig(t)

cfg.ShardingRing.ReplicationFactor = 1
cfg.ShardingRing.InstanceID = fmt.Sprintf("instance-%d", i)
cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.1-%d", i)
cfg.ShardingEnabled = true

// Increase state write interval so that state gets written sooner, making test faster.
cfg.Persister.Interval = 500 * time.Millisecond

am, err := createMultitenantAlertmanager(cfg, nil, nil, alertStore, ringStore, nil, log.NewLogfmtLogger(os.Stdout), reg)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, am))
})
require.NoError(t, services.StartAndAwaitRunning(ctx, am))

return am
}

// Create two instances. With replication factor of 1, this means that only one
// of the instances will own the user. This tests that an instance does not delete
// state for users that are configured, but are owned by other instances.
am1 := createInstance(1)
am2 := createInstance(2)

// Configure the users and wait for the state persister to write some state for both.
{
require.NoError(t, alertStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
User: user1,
RawConfig: simpleConfigOne,
Templates: []*alertspb.TemplateDesc{},
}))
require.NoError(t, alertStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
User: user2,
RawConfig: simpleConfigOne,
Templates: []*alertspb.TemplateDesc{},
}))

err := am1.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)
err = am2.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)

require.Eventually(t, func() bool {
_, err1 := alertStore.GetFullState(context.Background(), user1)
_, err2 := alertStore.GetFullState(context.Background(), user2)
return err1 == nil && err2 == nil
}, 5*time.Second, 100*time.Millisecond, "timed out waiting for state to be persisted")
}

// Perform another sync to trigger cleanup; this should have no effect.
{
err := am1.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)
err = am2.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)

_, err = alertStore.GetFullState(context.Background(), user1)
require.NoError(t, err)
_, err = alertStore.GetFullState(context.Background(), user2)
require.NoError(t, err)
}

// Delete one configuration and trigger cleanup; state for only that user should be deleted.
{
require.NoError(t, alertStore.DeleteAlertConfig(ctx, user1))

err := am1.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)
err = am2.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)

_, err = alertStore.GetFullState(context.Background(), user1)
require.Equal(t, alertspb.ErrNotFound, err)
_, err = alertStore.GetFullState(context.Background(), user2)
require.NoError(t, err)
}
}

func createFile(t *testing.T, path string) string {
dir := filepath.Dir(path)
require.NoError(t, os.MkdirAll(dir, 0777))
Expand Down Expand Up @@ -834,6 +927,7 @@ func TestMultitenantAlertmanager_InitialSyncWithSharding(t *testing.T) {
require.True(t, am.ringLifecycler.IsRegistered())
require.Equal(t, ring.JOINING.String(), am.ringLifecycler.GetState().String())
})
bkt.MockIter("alertmanager/", nil, nil)

// Once successfully started, the instance should be ACTIVE in the ring.
require.NoError(t, services.StartAndAwaitRunning(ctx, am))
Expand Down Expand Up @@ -1184,6 +1278,7 @@ func TestMultitenantAlertmanager_InitialSyncFailureWithSharding(t *testing.T) {
// Mock the store to fail listing configs.
bkt := &bucket.ClientMock{}
bkt.MockIter("alerts/", nil, errors.New("failed to list alerts"))
bkt.MockIter("alertmanager/", nil, nil)
store := bucketclient.NewBucketAlertStore(bkt, nil, log.NewNopLogger())

am, err := createMultitenantAlertmanager(amConfig, nil, nil, store, ringStore, nil, log.NewNopLogger(), nil)
Expand Down