Skip to content

Commit b584b82

Browse files
committed
Code review changes
Signed-off-by: Nick Pillitteri <[email protected]>
1 parent 9bb1ed6 commit b584b82

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

pkg/alertmanager/state_replication.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ type state struct {
7171
func newReplicatedStates(userID string, rf int, re Replicator, st alertstore.AlertStore, l log.Logger, r prometheus.Registerer) *state {
7272

7373
s := &state{
74-
logger: l,
74+
logger: log.With(l, "user", userID),
7575
userID: userID,
7676
replicationFactor: rf,
7777
replicator: re,
@@ -226,16 +226,17 @@ func (s *state) starting(ctx context.Context) error {
226226
s.fetchReplicaStateFailed.Inc()
227227
}
228228

229-
level.Info(s.logger).Log("msg", "state not settled; trying to read from storage", "err", err)
229+
level.Info(s.logger).Log("msg", "unable to read state from other Alertmanager replicas; trying to read from storage", "err", err)
230230
}
231231

232+
level.Info(s.logger).Log("msg", "reading state from storage")
232233
// Attempt to read the state from persistent storage instead.
233234
storeReadCtx, cancel := context.WithTimeout(ctx, s.storeReadTimeout)
234235
defer cancel()
235236

236237
fullState, err := s.store.GetFullState(storeReadCtx, s.userID)
237238
if errors.Is(err, alertspb.ErrNotFound) {
238-
level.Info(s.logger).Log("msg", "no state for user in storage; proceeding", "user", s.userID)
239+
level.Info(s.logger).Log("msg", "no state for user in storage; proceeding")
239240
s.initialSyncCompleted.WithLabelValues(syncUserNotFound).Inc()
240241
return nil
241242
}
@@ -269,11 +270,11 @@ func (s *state) mergeFullStates(fs []*clusterpb.FullState) error {
269270

270271
for _, f := range fs {
271272
for _, p := range f.Parts {
272-
level.Debug(s.logger).Log("msg", "merging full state", "user", s.userID, "key", p.Key, "bytes", len(p.Data))
273+
level.Debug(s.logger).Log("msg", "merging full state", "key", p.Key, "bytes", len(p.Data))
273274

274275
st, ok := s.states[p.Key]
275276
if !ok {
276-
level.Error(s.logger).Log("msg", "key not found while merging full state", "user", s.userID, "key", p.Key)
277+
level.Error(s.logger).Log("msg", "key not found while merging full state", "key", p.Key)
277278
continue
278279
}
279280

@@ -298,7 +299,7 @@ func (s *state) running(ctx context.Context) error {
298299
s.stateReplicationTotal.WithLabelValues(p.Key).Inc()
299300
if err := s.replicator.ReplicateStateForUser(ctx, s.userID, p); err != nil {
300301
s.stateReplicationFailed.WithLabelValues(p.Key).Inc()
301-
level.Error(s.logger).Log("msg", "failed to replicate state to other alertmanagers", "user", s.userID, "key", p.Key, "err", err)
302+
level.Error(s.logger).Log("msg", "failed to replicate state to other alertmanagers", "key", p.Key, "err", err)
302303
}
303304
case <-ctx.Done():
304305
return nil

pkg/alertmanager/state_replication_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"github.com/grafana/mimir/pkg/alertmanager/alertstore"
3030
)
3131

32+
const testUserID = "user-1"
33+
3234
type fakeState struct {
3335
binary []byte
3436
merges [][]byte
@@ -73,8 +75,8 @@ func (f *fakeReplicator) GetPositionForUser(_ string) int {
7375
}
7476

7577
func (f *fakeReplicator) ReadFullStateForUser(ctx context.Context, userID string) ([]*clusterpb.FullState, error) {
76-
if userID != "user-1" {
77-
return nil, errors.New("Unexpected userID")
78+
if userID != testUserID {
79+
return nil, errors.New("unexpected userID")
7880
}
7981

8082
if f.read.blocking {
@@ -121,13 +123,13 @@ func TestStateReplication(t *testing.T) {
121123
replicationFactor: 1,
122124
message: &clusterpb.Part{Key: "nflog", Data: []byte("OK")},
123125
replicationResults: map[string]clusterpb.Part{},
124-
storeResults: map[string]clusterpb.Part{"user-1": {Key: "nflog", Data: []byte("OK")}},
126+
storeResults: map[string]clusterpb.Part{testUserID: {Key: "nflog", Data: []byte("OK")}},
125127
},
126128
{
127129
name: "with a replication factor of > 1, state is broadcasted for replication.",
128130
replicationFactor: 3,
129131
message: &clusterpb.Part{Key: "nflog", Data: []byte("OK")},
130-
replicationResults: map[string]clusterpb.Part{"user-1": {Key: "nflog", Data: []byte("OK")}},
132+
replicationResults: map[string]clusterpb.Part{testUserID: {Key: "nflog", Data: []byte("OK")}},
131133
storeResults: map[string]clusterpb.Part{},
132134
},
133135
}
@@ -145,7 +147,7 @@ func TestStateReplication(t *testing.T) {
145147
}))
146148
}
147149

148-
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)
150+
s := newReplicatedStates(testUserID, tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)
149151
require.False(t, s.Ready())
150152
{
151153
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)

0 commit comments

Comments
 (0)