Skip to content

Commit cac96f5

Browse files
56quartersmason
authored and
mason
committed
Restore alertmanager state from storage as fallback (grafana#2293)
* Restore alertmanager state from storage as fallback In cortexproject/cortex#3925 the ability to restore alertmanager state from peer alertmanagers was added, short-circuiting if there is only a single replica of the alertmanager. In cortexproject/cortex#4021 a fallback to read state from storage was added in case reading from peers failed. However, the short-circuiting if there is only a single peer was not removed. This has the effect of never restoring state in an alertmanager if only running a single replica. Fixes grafana#2245 Signed-off-by: Nick Pillitteri <[email protected]> * Code review changes Signed-off-by: Nick Pillitteri <[email protected]>
1 parent 7ff1a55 commit cac96f5

File tree

4 files changed

+80
-77
lines changed

4 files changed

+80
-77
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* [CHANGE] Ruler: Remove unused CLI flags `-ruler.search-pending-for` and `-ruler.flush-period` (and their respective YAML config options). #2288
99
* [ENHANCEMENT] Alertmanager: Allow the HTTP `proxy_url` configuration option in the receiver's configuration. #2317
1010
* [BUGFIX] Compactor: log the actual error on compaction failed. #2261
11+
* [BUGFIX] Alertmanager: restore state from storage even when running a single replica. #2293
1112

1213
### Mixin
1314

pkg/alertmanager/alertmanager_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func createAlertmanagerAndSendAlerts(t *testing.T, alertGroups, groupsLimit, exp
6565
TenantDataDir: t.TempDir(),
6666
ExternalURL: &url.URL{Path: "/am"},
6767
ShardingEnabled: true,
68+
Store: prepareInMemoryAlertStore(),
6869
Replicator: &stubReplicator{},
6970
ReplicationFactor: 1,
7071
// We have to set this interval non-zero, though we don't need the persister to do anything.
@@ -148,6 +149,7 @@ func TestDispatcherLoggerInsightKey(t *testing.T) {
148149
TenantDataDir: t.TempDir(),
149150
ExternalURL: &url.URL{Path: "/am"},
150151
ShardingEnabled: true,
152+
Store: prepareInMemoryAlertStore(),
151153
Replicator: &stubReplicator{},
152154
ReplicationFactor: 1,
153155
PersisterConfig: PersisterConfig{Interval: time.Hour},

pkg/alertmanager/state_replication.go

+31-32
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ type state struct {
7171
func newReplicatedStates(userID string, rf int, re Replicator, st alertstore.AlertStore, l log.Logger, r prometheus.Registerer) *state {
7272

7373
s := &state{
74-
logger: l,
74+
logger: log.With(l, "user", userID),
7575
userID: userID,
7676
replicationFactor: rf,
7777
replicator: re,
@@ -199,45 +199,44 @@ func (s *state) starting(ctx context.Context) error {
199199
timer := prometheus.NewTimer(s.initialSyncDuration)
200200
defer timer.ObserveDuration()
201201

202-
level.Info(s.logger).Log("msg", "Waiting for notification and silences to settle...")
203-
204-
// If the replication factor is <= 1, there is nowhere to obtain the state from.
205-
if s.replicationFactor <= 1 {
206-
level.Info(s.logger).Log("msg", "skipping settling (no replicas)")
207-
return nil
208-
}
209-
210-
// We can check other alertmanager(s) and explicitly ask them to propagate their state to us if available.
211-
readCtx, cancel := context.WithTimeout(ctx, s.settleReadTimeout)
212-
defer cancel()
202+
// If replication factor is > 1 attempt to read state from other replicas, falling back to reading from
203+
// storage if they are unavailable.
204+
if s.replicationFactor > 1 {
205+
level.Info(s.logger).Log("msg", "Waiting for notification and silences to settle...")
206+
207+
// We can check other alertmanager(s) and explicitly ask them to propagate their state to us if available.
208+
readCtx, cancel := context.WithTimeout(ctx, s.settleReadTimeout)
209+
defer cancel()
210+
211+
s.fetchReplicaStateTotal.Inc()
212+
fullStates, err := s.replicator.ReadFullStateForUser(readCtx, s.userID)
213+
if err == nil {
214+
if err = s.mergeFullStates(fullStates); err == nil {
215+
level.Info(s.logger).Log("msg", "state settled; proceeding")
216+
s.initialSyncCompleted.WithLabelValues(syncFromReplica).Inc()
217+
return nil
218+
}
219+
}
213220

214-
s.fetchReplicaStateTotal.Inc()
215-
fullStates, err := s.replicator.ReadFullStateForUser(readCtx, s.userID)
216-
if err == nil {
217-
if err = s.mergeFullStates(fullStates); err == nil {
218-
level.Info(s.logger).Log("msg", "state settled; proceeding")
219-
s.initialSyncCompleted.WithLabelValues(syncFromReplica).Inc()
220-
return nil
221+
// The user not being found in all of the replicas is not recorded as a failure, as this is
222+
// expected when this is the first replica to come up for a user. Note that it is important
223+
// to continue and try to read from the state from remote storage, as the replicas may have
224+
// lost state due to an all-replica restart.
225+
if err != errAllReplicasUserNotFound {
226+
s.fetchReplicaStateFailed.Inc()
221227
}
222-
}
223228

224-
// The user not being found in all of the replicas is not recorded as a failure, as this is
225-
// expected when this is the first replica to come up for a user. Note that it is important
226-
// to continue and try to read from the state from remote storage, as the replicas may have
227-
// lost state due to an all-replica restart.
228-
if err != errAllReplicasUserNotFound {
229-
s.fetchReplicaStateFailed.Inc()
229+
level.Info(s.logger).Log("msg", "unable to read state from other Alertmanager replicas; trying to read from storage", "err", err)
230230
}
231231

232-
level.Info(s.logger).Log("msg", "state not settled; trying to read from storage", "err", err)
233-
232+
level.Info(s.logger).Log("msg", "reading state from storage")
234233
// Attempt to read the state from persistent storage instead.
235234
storeReadCtx, cancel := context.WithTimeout(ctx, s.storeReadTimeout)
236235
defer cancel()
237236

238237
fullState, err := s.store.GetFullState(storeReadCtx, s.userID)
239238
if errors.Is(err, alertspb.ErrNotFound) {
240-
level.Info(s.logger).Log("msg", "no state for user in storage; proceeding", "user", s.userID)
239+
level.Info(s.logger).Log("msg", "no state for user in storage; proceeding")
241240
s.initialSyncCompleted.WithLabelValues(syncUserNotFound).Inc()
242241
return nil
243242
}
@@ -271,11 +270,11 @@ func (s *state) mergeFullStates(fs []*clusterpb.FullState) error {
271270

272271
for _, f := range fs {
273272
for _, p := range f.Parts {
274-
level.Debug(s.logger).Log("msg", "merging full state", "user", s.userID, "key", p.Key, "bytes", len(p.Data))
273+
level.Debug(s.logger).Log("msg", "merging full state", "key", p.Key, "bytes", len(p.Data))
275274

276275
st, ok := s.states[p.Key]
277276
if !ok {
278-
level.Error(s.logger).Log("msg", "key not found while merging full state", "user", s.userID, "key", p.Key)
277+
level.Error(s.logger).Log("msg", "key not found while merging full state", "key", p.Key)
279278
continue
280279
}
281280

@@ -300,7 +299,7 @@ func (s *state) running(ctx context.Context) error {
300299
s.stateReplicationTotal.WithLabelValues(p.Key).Inc()
301300
if err := s.replicator.ReplicateStateForUser(ctx, s.userID, p); err != nil {
302301
s.stateReplicationFailed.WithLabelValues(p.Key).Inc()
303-
level.Error(s.logger).Log("msg", "failed to replicate state to other alertmanagers", "user", s.userID, "key", p.Key, "err", err)
302+
level.Error(s.logger).Log("msg", "failed to replicate state to other alertmanagers", "key", p.Key, "err", err)
304303
}
305304
case <-ctx.Done():
306305
return nil

pkg/alertmanager/state_replication_test.go

+46-45
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"github.com/grafana/mimir/pkg/alertmanager/alertstore"
3030
)
3131

32+
const testUserID = "user-1"
33+
3234
type fakeState struct {
3335
binary []byte
3436
merges [][]byte
@@ -73,8 +75,8 @@ func (f *fakeReplicator) GetPositionForUser(_ string) int {
7375
}
7476

7577
func (f *fakeReplicator) ReadFullStateForUser(ctx context.Context, userID string) ([]*clusterpb.FullState, error) {
76-
if userID != "user-1" {
77-
return nil, errors.New("Unexpected userID")
78+
if userID != testUserID {
79+
return nil, errors.New("unexpected userID")
7880
}
7981

8082
if f.read.blocking {
@@ -96,31 +98,39 @@ func newFakeAlertStore() *fakeAlertStore {
9698
}
9799
}
98100

99-
func (f *fakeAlertStore) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
101+
func (f *fakeAlertStore) GetFullState(_ context.Context, user string) (alertspb.FullStateDesc, error) {
100102
if result, ok := f.states[user]; ok {
101103
return result, nil
102104
}
103105
return alertspb.FullStateDesc{}, alertspb.ErrNotFound
104106
}
105107

108+
func (f *fakeAlertStore) SetFullState(_ context.Context, user string, state alertspb.FullStateDesc) error {
109+
f.states[user] = state
110+
return nil
111+
}
112+
106113
func TestStateReplication(t *testing.T) {
107114
tc := []struct {
108-
name string
109-
replicationFactor int
110-
message *clusterpb.Part
111-
results map[string]*clusterpb.Part
115+
name string
116+
replicationFactor int
117+
message *clusterpb.Part
118+
replicationResults map[string]clusterpb.Part
119+
storeResults map[string]clusterpb.Part
112120
}{
113121
{
114-
name: "with a replication factor of <= 1, state is not replicated.",
115-
replicationFactor: 1,
116-
message: &clusterpb.Part{Key: "nflog", Data: []byte("OK")},
117-
results: map[string]*clusterpb.Part{},
122+
name: "with a replication factor of <= 1, state is not replicated but loaded from storage.",
123+
replicationFactor: 1,
124+
message: &clusterpb.Part{Key: "nflog", Data: []byte("OK")},
125+
replicationResults: map[string]clusterpb.Part{},
126+
storeResults: map[string]clusterpb.Part{testUserID: {Key: "nflog", Data: []byte("OK")}},
118127
},
119128
{
120-
name: "with a replication factor of > 1, state is broadcasted for replication.",
121-
replicationFactor: 3,
122-
message: &clusterpb.Part{Key: "nflog", Data: []byte("OK")},
123-
results: map[string]*clusterpb.Part{"user-1": {Key: "nflog", Data: []byte("OK")}},
129+
name: "with a replication factor of > 1, state is broadcasted for replication.",
130+
replicationFactor: 3,
131+
message: &clusterpb.Part{Key: "nflog", Data: []byte("OK")},
132+
replicationResults: map[string]clusterpb.Part{testUserID: {Key: "nflog", Data: []byte("OK")}},
133+
storeResults: map[string]clusterpb.Part{},
124134
},
125135
}
126136

@@ -129,9 +139,15 @@ func TestStateReplication(t *testing.T) {
129139
reg := prometheus.NewPedanticRegistry()
130140
replicator := newFakeReplicator()
131141
replicator.read = readStateResult{res: nil, err: nil}
142+
132143
store := newFakeAlertStore()
133-
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)
144+
for user, part := range tt.storeResults {
145+
require.NoError(t, store.SetFullState(context.Background(), user, alertspb.FullStateDesc{
146+
State: &clusterpb.FullState{Parts: []clusterpb.Part{part}},
147+
}))
148+
}
134149

150+
s := newReplicatedStates(testUserID, tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)
135151
require.False(t, s.Ready())
136152
{
137153
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
@@ -161,47 +177,32 @@ func TestStateReplication(t *testing.T) {
161177
require.Eventually(t, func() bool {
162178
replicator.mtx.Lock()
163179
defer replicator.mtx.Unlock()
164-
return len(replicator.results) == len(tt.results)
180+
return len(replicator.results) == len(tt.replicationResults)
165181
}, time.Second, time.Millisecond)
166182

167183
if tt.replicationFactor > 1 {
184+
// If the replication factor is greater than 1, we expect state to be loaded from other Alertmanagers
168185
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
169-
# HELP alertmanager_state_fetch_replica_state_failed_total Number of times we have failed to read and merge the full state from another replica.
170-
# TYPE alertmanager_state_fetch_replica_state_failed_total counter
171-
alertmanager_state_fetch_replica_state_failed_total 0
172-
# HELP alertmanager_state_fetch_replica_state_total Number of times we have tried to read and merge the full state from another replica.
173-
# TYPE alertmanager_state_fetch_replica_state_total counter
174-
alertmanager_state_fetch_replica_state_total 1
175-
# HELP alertmanager_partial_state_merges_failed_total Number of times we have failed to merge a partial state received for a key.
176-
# TYPE alertmanager_partial_state_merges_failed_total counter
177-
alertmanager_partial_state_merges_failed_total{key="nflog"} 0
178-
# HELP alertmanager_partial_state_merges_total Number of times we have received a partial state to merge for a key.
179-
# TYPE alertmanager_partial_state_merges_total counter
180-
alertmanager_partial_state_merges_total{key="nflog"} 0
181186
# HELP alertmanager_state_initial_sync_completed_total Number of times we have completed syncing initial state for each possible outcome.
182187
# TYPE alertmanager_state_initial_sync_completed_total counter
183188
alertmanager_state_initial_sync_completed_total{outcome="failed"} 0
184189
alertmanager_state_initial_sync_completed_total{outcome="from-replica"} 1
185190
alertmanager_state_initial_sync_completed_total{outcome="from-storage"} 0
186191
alertmanager_state_initial_sync_completed_total{outcome="user-not-found"} 0
187-
# HELP alertmanager_state_initial_sync_total Number of times we have tried to sync initial state from peers or remote storage.
188-
# TYPE alertmanager_state_initial_sync_total counter
189-
alertmanager_state_initial_sync_total 1
190-
# HELP alertmanager_state_replication_failed_total Number of times we have failed to replicate a state to other alertmanagers.
191-
# TYPE alertmanager_state_replication_failed_total counter
192-
alertmanager_state_replication_failed_total{key="nflog"} 0
193-
# HELP alertmanager_state_replication_total Number of times we have tried to replicate a state to other alertmanagers.
194-
# TYPE alertmanager_state_replication_total counter
195-
alertmanager_state_replication_total{key="nflog"} 1
196192
`),
197-
"alertmanager_state_fetch_replica_state_failed_total",
198-
"alertmanager_state_fetch_replica_state_total",
199-
"alertmanager_partial_state_merges_failed_total",
200-
"alertmanager_partial_state_merges_total",
201193
"alertmanager_state_initial_sync_completed_total",
202-
"alertmanager_state_initial_sync_total",
203-
"alertmanager_state_replication_failed_total",
204-
"alertmanager_state_replication_total",
194+
))
195+
} else {
196+
// Replication factor is 1, we expect state to be loaded from storage *instead* of other Alertmanagers
197+
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
198+
# HELP alertmanager_state_initial_sync_completed_total Number of times we have completed syncing initial state for each possible outcome.
199+
# TYPE alertmanager_state_initial_sync_completed_total counter
200+
alertmanager_state_initial_sync_completed_total{outcome="failed"} 0
201+
alertmanager_state_initial_sync_completed_total{outcome="from-replica"} 0
202+
alertmanager_state_initial_sync_completed_total{outcome="from-storage"} 1
203+
alertmanager_state_initial_sync_completed_total{outcome="user-not-found"} 0
204+
`),
205+
"alertmanager_state_initial_sync_completed_total",
205206
))
206207

207208
}

0 commit comments

Comments
 (0)