Skip to content

Commit 9bb1ed6

Browse files
committed
Restore alertmanager state from storage as fallback
In cortexproject/cortex#3925 the ability to restore alertmanager state from peer alertmanagers was added, short-circuiting if there is only a single replica of the alertmanager. In cortexproject/cortex#4021 a fallback to read state from storage was added in case reading from peers failed. However, the short-circuiting if there is only a single peer was not removed. This has the effect of never restoring state in an alertmanager if only running a single replica. Fixes #2245 Signed-off-by: Nick Pillitteri <[email protected]>
1 parent d08f7da commit 9bb1ed6

File tree

4 files changed

+70
-70
lines changed

4 files changed

+70
-70
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* [CHANGE] Ruler: Remove unused CLI flags `-ruler.search-pending-for` and `-ruler.flush-period` (and their respective YAML config options). #2288
99
* [ENHANCEMENT] Alertmanager: Allow the HTTP `proxy_url` configuration option in the receiver's configuration. #2317
1010
* [BUGFIX] Compactor: log the actual error on compaction failed. #2261
11+
* [BUGFIX] Alertmanager: restore state from storage even when running a single replica. #2293
1112

1213
### Mixin
1314

pkg/alertmanager/alertmanager_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func createAlertmanagerAndSendAlerts(t *testing.T, alertGroups, groupsLimit, exp
6565
TenantDataDir: t.TempDir(),
6666
ExternalURL: &url.URL{Path: "/am"},
6767
ShardingEnabled: true,
68+
Store: prepareInMemoryAlertStore(),
6869
Replicator: &stubReplicator{},
6970
ReplicationFactor: 1,
7071
// We have to set this interval non-zero, though we don't need the persister to do anything.
@@ -148,6 +149,7 @@ func TestDispatcherLoggerInsightKey(t *testing.T) {
148149
TenantDataDir: t.TempDir(),
149150
ExternalURL: &url.URL{Path: "/am"},
150151
ShardingEnabled: true,
152+
Store: prepareInMemoryAlertStore(),
151153
Replicator: &stubReplicator{},
152154
ReplicationFactor: 1,
153155
PersisterConfig: PersisterConfig{Interval: time.Hour},

pkg/alertmanager/state_replication.go

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -199,38 +199,36 @@ func (s *state) starting(ctx context.Context) error {
199199
timer := prometheus.NewTimer(s.initialSyncDuration)
200200
defer timer.ObserveDuration()
201201

202-
level.Info(s.logger).Log("msg", "Waiting for notification and silences to settle...")
203-
204-
// If the replication factor is <= 1, there is nowhere to obtain the state from.
205-
if s.replicationFactor <= 1 {
206-
level.Info(s.logger).Log("msg", "skipping settling (no replicas)")
207-
return nil
208-
}
209-
210-
// We can check other alertmanager(s) and explicitly ask them to propagate their state to us if available.
211-
readCtx, cancel := context.WithTimeout(ctx, s.settleReadTimeout)
212-
defer cancel()
202+
// If replication factor is > 1 attempt to read state from other replicas, falling back to reading from
203+
// storage if they are unavailable.
204+
if s.replicationFactor > 1 {
205+
level.Info(s.logger).Log("msg", "Waiting for notification and silences to settle...")
206+
207+
// We can check other alertmanager(s) and explicitly ask them to propagate their state to us if available.
208+
readCtx, cancel := context.WithTimeout(ctx, s.settleReadTimeout)
209+
defer cancel()
210+
211+
s.fetchReplicaStateTotal.Inc()
212+
fullStates, err := s.replicator.ReadFullStateForUser(readCtx, s.userID)
213+
if err == nil {
214+
if err = s.mergeFullStates(fullStates); err == nil {
215+
level.Info(s.logger).Log("msg", "state settled; proceeding")
216+
s.initialSyncCompleted.WithLabelValues(syncFromReplica).Inc()
217+
return nil
218+
}
219+
}
213220

214-
s.fetchReplicaStateTotal.Inc()
215-
fullStates, err := s.replicator.ReadFullStateForUser(readCtx, s.userID)
216-
if err == nil {
217-
if err = s.mergeFullStates(fullStates); err == nil {
218-
level.Info(s.logger).Log("msg", "state settled; proceeding")
219-
s.initialSyncCompleted.WithLabelValues(syncFromReplica).Inc()
220-
return nil
221+
// The user not being found in all of the replicas is not recorded as a failure, as this is
222+
// expected when this is the first replica to come up for a user. Note that it is important
223+
// to continue and try to read from the state from remote storage, as the replicas may have
224+
// lost state due to an all-replica restart.
225+
if err != errAllReplicasUserNotFound {
226+
s.fetchReplicaStateFailed.Inc()
221227
}
222-
}
223228

224-
// The user not being found in all of the replicas is not recorded as a failure, as this is
225-
// expected when this is the first replica to come up for a user. Note that it is important
226-
// to continue and try to read from the state from remote storage, as the replicas may have
227-
// lost state due to an all-replica restart.
228-
if err != errAllReplicasUserNotFound {
229-
s.fetchReplicaStateFailed.Inc()
229+
level.Info(s.logger).Log("msg", "state not settled; trying to read from storage", "err", err)
230230
}
231231

232-
level.Info(s.logger).Log("msg", "state not settled; trying to read from storage", "err", err)
233-
234232
// Attempt to read the state from persistent storage instead.
235233
storeReadCtx, cancel := context.WithTimeout(ctx, s.storeReadTimeout)
236234
defer cancel()

pkg/alertmanager/state_replication_test.go

Lines changed: 42 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -96,31 +96,39 @@ func newFakeAlertStore() *fakeAlertStore {
9696
}
9797
}
9898

99-
func (f *fakeAlertStore) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
99+
func (f *fakeAlertStore) GetFullState(_ context.Context, user string) (alertspb.FullStateDesc, error) {
100100
if result, ok := f.states[user]; ok {
101101
return result, nil
102102
}
103103
return alertspb.FullStateDesc{}, alertspb.ErrNotFound
104104
}
105105

106+
func (f *fakeAlertStore) SetFullState(_ context.Context, user string, state alertspb.FullStateDesc) error {
107+
f.states[user] = state
108+
return nil
109+
}
110+
106111
func TestStateReplication(t *testing.T) {
107112
tc := []struct {
108-
name string
109-
replicationFactor int
110-
message *clusterpb.Part
111-
results map[string]*clusterpb.Part
113+
name string
114+
replicationFactor int
115+
message *clusterpb.Part
116+
replicationResults map[string]clusterpb.Part
117+
storeResults map[string]clusterpb.Part
112118
}{
113119
{
114-
name: "with a replication factor of <= 1, state is not replicated.",
115-
replicationFactor: 1,
116-
message: &clusterpb.Part{Key: "nflog", Data: []byte("OK")},
117-
results: map[string]*clusterpb.Part{},
120+
name: "with a replication factor of <= 1, state is not replicated but loaded from storage.",
121+
replicationFactor: 1,
122+
message: &clusterpb.Part{Key: "nflog", Data: []byte("OK")},
123+
replicationResults: map[string]clusterpb.Part{},
124+
storeResults: map[string]clusterpb.Part{"user-1": {Key: "nflog", Data: []byte("OK")}},
118125
},
119126
{
120-
name: "with a replication factor of > 1, state is broadcasted for replication.",
121-
replicationFactor: 3,
122-
message: &clusterpb.Part{Key: "nflog", Data: []byte("OK")},
123-
results: map[string]*clusterpb.Part{"user-1": {Key: "nflog", Data: []byte("OK")}},
127+
name: "with a replication factor of > 1, state is broadcasted for replication.",
128+
replicationFactor: 3,
129+
message: &clusterpb.Part{Key: "nflog", Data: []byte("OK")},
130+
replicationResults: map[string]clusterpb.Part{"user-1": {Key: "nflog", Data: []byte("OK")}},
131+
storeResults: map[string]clusterpb.Part{},
124132
},
125133
}
126134

@@ -129,9 +137,15 @@ func TestStateReplication(t *testing.T) {
129137
reg := prometheus.NewPedanticRegistry()
130138
replicator := newFakeReplicator()
131139
replicator.read = readStateResult{res: nil, err: nil}
140+
132141
store := newFakeAlertStore()
133-
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)
142+
for user, part := range tt.storeResults {
143+
require.NoError(t, store.SetFullState(context.Background(), user, alertspb.FullStateDesc{
144+
State: &clusterpb.FullState{Parts: []clusterpb.Part{part}},
145+
}))
146+
}
134147

148+
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)
135149
require.False(t, s.Ready())
136150
{
137151
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
@@ -161,47 +175,32 @@ func TestStateReplication(t *testing.T) {
161175
require.Eventually(t, func() bool {
162176
replicator.mtx.Lock()
163177
defer replicator.mtx.Unlock()
164-
return len(replicator.results) == len(tt.results)
178+
return len(replicator.results) == len(tt.replicationResults)
165179
}, time.Second, time.Millisecond)
166180

167181
if tt.replicationFactor > 1 {
182+
// If the replication factor is greater than 1, we expect state to be loaded from other Alertmanagers
168183
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
169-
# HELP alertmanager_state_fetch_replica_state_failed_total Number of times we have failed to read and merge the full state from another replica.
170-
# TYPE alertmanager_state_fetch_replica_state_failed_total counter
171-
alertmanager_state_fetch_replica_state_failed_total 0
172-
# HELP alertmanager_state_fetch_replica_state_total Number of times we have tried to read and merge the full state from another replica.
173-
# TYPE alertmanager_state_fetch_replica_state_total counter
174-
alertmanager_state_fetch_replica_state_total 1
175-
# HELP alertmanager_partial_state_merges_failed_total Number of times we have failed to merge a partial state received for a key.
176-
# TYPE alertmanager_partial_state_merges_failed_total counter
177-
alertmanager_partial_state_merges_failed_total{key="nflog"} 0
178-
# HELP alertmanager_partial_state_merges_total Number of times we have received a partial state to merge for a key.
179-
# TYPE alertmanager_partial_state_merges_total counter
180-
alertmanager_partial_state_merges_total{key="nflog"} 0
181184
# HELP alertmanager_state_initial_sync_completed_total Number of times we have completed syncing initial state for each possible outcome.
182185
# TYPE alertmanager_state_initial_sync_completed_total counter
183186
alertmanager_state_initial_sync_completed_total{outcome="failed"} 0
184187
alertmanager_state_initial_sync_completed_total{outcome="from-replica"} 1
185188
alertmanager_state_initial_sync_completed_total{outcome="from-storage"} 0
186189
alertmanager_state_initial_sync_completed_total{outcome="user-not-found"} 0
187-
# HELP alertmanager_state_initial_sync_total Number of times we have tried to sync initial state from peers or remote storage.
188-
# TYPE alertmanager_state_initial_sync_total counter
189-
alertmanager_state_initial_sync_total 1
190-
# HELP alertmanager_state_replication_failed_total Number of times we have failed to replicate a state to other alertmanagers.
191-
# TYPE alertmanager_state_replication_failed_total counter
192-
alertmanager_state_replication_failed_total{key="nflog"} 0
193-
# HELP alertmanager_state_replication_total Number of times we have tried to replicate a state to other alertmanagers.
194-
# TYPE alertmanager_state_replication_total counter
195-
alertmanager_state_replication_total{key="nflog"} 1
196190
`),
197-
"alertmanager_state_fetch_replica_state_failed_total",
198-
"alertmanager_state_fetch_replica_state_total",
199-
"alertmanager_partial_state_merges_failed_total",
200-
"alertmanager_partial_state_merges_total",
201191
"alertmanager_state_initial_sync_completed_total",
202-
"alertmanager_state_initial_sync_total",
203-
"alertmanager_state_replication_failed_total",
204-
"alertmanager_state_replication_total",
192+
))
193+
} else {
194+
// Replication factor is 1, we expect state to be loaded from storage *instead* of other Alertmanagers
195+
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
196+
# HELP alertmanager_state_initial_sync_completed_total Number of times we have completed syncing initial state for each possible outcome.
197+
# TYPE alertmanager_state_initial_sync_completed_total counter
198+
alertmanager_state_initial_sync_completed_total{outcome="failed"} 0
199+
alertmanager_state_initial_sync_completed_total{outcome="from-replica"} 0
200+
alertmanager_state_initial_sync_completed_total{outcome="from-storage"} 1
201+
alertmanager_state_initial_sync_completed_total{outcome="user-not-found"} 0
202+
`),
203+
"alertmanager_state_initial_sync_completed_total",
205204
))
206205

207206
}

0 commit comments

Comments
 (0)