Skip to content

Commit 77afe99

Browse files
committed
Implement ReadFullStateForUser in MultitenantAlertmanager.
Signed-off-by: Steve Simpson <[email protected]>
1 parent aed52b3 commit 77afe99

File tree

5 files changed

+317
-9
lines changed

5 files changed

+317
-9
lines changed

pkg/alertmanager/alertmanager.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
161161
am.state = cfg.Peer
162162
} else if cfg.ShardingEnabled {
163163
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
164-
state := newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry)
165-
166-
if err := state.Service.StartAsync(context.Background()); err != nil {
167-
return nil, errors.Wrap(err, "failed to start ring-based replication service")
168-
}
169-
170-
am.state = state
164+
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry)
171165
} else {
172166
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
173167
am.state = &NilPeer{}
@@ -205,6 +199,13 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
205199
c = am.state.AddState("sil:"+cfg.UserID, am.silences, am.registry)
206200
am.silences.SetBroadcast(c.Broadcast)
207201

202+
// State replication needs to be started after the state keys are defined.
203+
if service, ok := am.state.(services.Service); ok {
204+
if err := service.StartAsync(context.Background()); err != nil {
205+
return nil, errors.Wrap(err, "failed to start ring-based replication service")
206+
}
207+
}
208+
208209
am.pipelineBuilder = notify.NewPipelineBuilder(am.registry)
209210

210211
am.wg.Add(1)

pkg/alertmanager/multitenant.go

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/cortexproject/cortex/pkg/ring/kv"
3636
"github.com/cortexproject/cortex/pkg/tenant"
3737
"github.com/cortexproject/cortex/pkg/util"
38+
"github.com/cortexproject/cortex/pkg/util/concurrency"
3839
"github.com/cortexproject/cortex/pkg/util/flagext"
3940
util_log "github.com/cortexproject/cortex/pkg/util/log"
4041
"github.com/cortexproject/cortex/pkg/util/services"
@@ -1011,8 +1012,77 @@ func (am *MultitenantAlertmanager) ReplicateStateForUser(ctx context.Context, us
10111012
return err
10121013
}
10131014

1014-
func (am *MultitenantAlertmanager) ReadFullStateForUser(context.Context, string) ([]*clusterpb.FullState, error) {
1015-
return nil, nil
1015+
// ReadFullStateForUser attempts to read the full state from each replica for user. Note that it will try to obtain and return
1016+
// state from all replicas, but will consider it a success if state is obtained from at least one replica.
1017+
func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, userID string) ([]*clusterpb.FullState, error) {
1018+
1019+
// Only get the set of replicas which contain the specified user.
1020+
key := shardByUser(userID)
1021+
replicationSet, err := am.ring.Get(key, RingOp, nil, nil, nil)
1022+
if err != nil {
1023+
return nil, err
1024+
}
1025+
1026+
// We should only query state from other replicas, and not our own state.
1027+
addrs := replicationSet.GetAddressesWithout(am.ringLifecycler.GetInstanceAddr())
1028+
1029+
var (
1030+
resultsMtx sync.Mutex
1031+
results []*clusterpb.FullState
1032+
numUserNotFound int
1033+
)
1034+
1035+
// Note that the jobs swallow the errors - this is because we want to give each replica a chance to respond.
1036+
jobs := concurrency.CreateJobsFromStrings(addrs)
1037+
err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error {
1038+
addr := job.(string)
1039+
level.Debug(am.logger).Log("msg", "contacting replica for full state", "user", userID, "addr", addr)
1040+
1041+
c, err := am.alertmanagerClientsPool.GetClientFor(addr)
1042+
if err != nil {
1043+
level.Error(am.logger).Log("msg", "failed to get rpc client", "err", err)
1044+
return nil
1045+
}
1046+
1047+
resp, err := c.ReadState(user.InjectOrgID(ctx, userID), &alertmanagerpb.ReadStateRequest{})
1048+
if err != nil {
1049+
level.Error(am.logger).Log("msg", "rpc reading state from replica failed", "addr", addr, "user", userID, "err", err)
1050+
return nil
1051+
}
1052+
1053+
switch resp.Status {
1054+
case alertmanagerpb.READ_OK:
1055+
resultsMtx.Lock()
1056+
results = append(results, resp.State)
1057+
resultsMtx.Unlock()
1058+
case alertmanagerpb.READ_ERROR:
1059+
level.Error(am.logger).Log("msg", "error trying to read state", "addr", addr, "user", userID, "err", resp.Error)
1060+
case alertmanagerpb.READ_USER_NOT_FOUND:
1061+
level.Debug(am.logger).Log("msg", "user not found while trying to read state", "addr", addr, "user", userID)
1062+
resultsMtx.Lock()
1063+
numUserNotFound++
1064+
resultsMtx.Unlock()
1065+
default:
1066+
level.Error(am.logger).Log("msg", "unknown response trying to read state", "addr", addr, "user", userID)
1067+
}
1068+
return nil
1069+
})
1070+
if err != nil {
1071+
return nil, err
1072+
}
1073+
1074+
// If all replicas reply with user not found, then we're not going to improve by retrying.
1075+
if numUserNotFound == len(addrs) {
1076+
level.Debug(am.logger).Log("msg", "user unknown in all replicas", "user", userID)
1077+
return []*clusterpb.FullState{}, nil
1078+
}
1079+
1080+
// We only require the state from a single replica, though we return as many as we were able to obtain.
1081+
if len(results) == 0 {
1082+
return nil, fmt.Errorf("failed to read state from any replica")
1083+
}
1084+
1085+
return results, nil
10161086
}
10171087

10181088
// UpdateState implements the Alertmanager service.

pkg/alertmanager/multitenant_test.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,6 +1205,185 @@ func TestAlertmanager_StateReplicationWithSharding(t *testing.T) {
12051205
}
12061206
}
12071207

1208+
func TestAlertmanager_StateReplicationWithSharding_InitialSyncFromPeers(t *testing.T) {
1209+
tc := []struct {
1210+
name string
1211+
replicationFactor int
1212+
}{
1213+
{
1214+
name: "RF = 2",
1215+
replicationFactor: 2,
1216+
},
1217+
{
1218+
name: "RF = 3",
1219+
replicationFactor: 3,
1220+
},
1221+
}
1222+
1223+
for _, tt := range tc {
1224+
t.Run(tt.name, func(t *testing.T) {
1225+
ctx := context.Background()
1226+
ringStore := consul.NewInMemoryClient(ring.GetCodec())
1227+
mockStore := prepareInMemoryAlertStore()
1228+
clientPool := newPassthroughAlertmanagerClientPool()
1229+
externalURL := flagext.URLValue{}
1230+
err := externalURL.Set("http://localhost:8080/alertmanager")
1231+
require.NoError(t, err)
1232+
1233+
var instances []*MultitenantAlertmanager
1234+
var instanceIDs []string
1235+
registries := util.NewUserRegistries()
1236+
1237+
// Create only two users - no need for more for these test cases.
1238+
for i := 1; i <= 2; i++ {
1239+
u := fmt.Sprintf("u-%d", i)
1240+
require.NoError(t, mockStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
1241+
User: u,
1242+
RawConfig: simpleConfigOne,
1243+
Templates: []*alertspb.TemplateDesc{},
1244+
}))
1245+
}
1246+
1247+
createInstance := func(i int) *MultitenantAlertmanager {
1248+
instanceIDs = append(instanceIDs, fmt.Sprintf("alertmanager-%d", i))
1249+
instanceID := fmt.Sprintf("alertmanager-%d", i)
1250+
1251+
amConfig := mockAlertmanagerConfig(t)
1252+
amConfig.ExternalURL = externalURL
1253+
amConfig.ShardingRing.ReplicationFactor = tt.replicationFactor
1254+
amConfig.ShardingRing.InstanceID = instanceID
1255+
amConfig.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i)
1256+
1257+
// Do not check the ring topology changes or poll in an interval in this test (we explicitly sync alertmanagers).
1258+
amConfig.PollInterval = time.Hour
1259+
amConfig.ShardingRing.RingCheckPeriod = time.Hour
1260+
1261+
amConfig.ShardingEnabled = true
1262+
1263+
reg := prometheus.NewPedanticRegistry()
1264+
am, err := createMultitenantAlertmanager(amConfig, nil, nil, mockStore, ringStore, log.NewNopLogger(), reg)
1265+
require.NoError(t, err)
1266+
1267+
clientPool.servers[amConfig.ShardingRing.InstanceAddr+":0"] = am
1268+
am.alertmanagerClientsPool = clientPool
1269+
1270+
require.NoError(t, services.StartAndAwaitRunning(ctx, am))
1271+
t.Cleanup(func() {
1272+
services.StopAndAwaitTerminated(ctx, am) //nolint:errcheck
1273+
})
1274+
1275+
instances = append(instances, am)
1276+
instanceIDs = append(instanceIDs, instanceID)
1277+
registries.AddUserRegistry(instanceID, reg)
1278+
1279+
// Make sure the ring is settled.
1280+
{
1281+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
1282+
defer cancel()
1283+
1284+
// The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles.
1285+
for _, am := range instances {
1286+
for _, id := range instanceIDs {
1287+
require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE))
1288+
}
1289+
}
1290+
}
1291+
1292+
// Now that the ring has settled, sync configs with the instances.
1293+
require.NoError(t, am.loadAndSyncConfigs(ctx, reasonRingChange))
1294+
1295+
return am
1296+
}
1297+
1298+
writeSilence := func(i *MultitenantAlertmanager, userID string) {
1299+
silence := types.Silence{
1300+
Matchers: labels.Matchers{
1301+
{Name: "instance", Value: "prometheus-one"},
1302+
},
1303+
Comment: "Created for a test case.",
1304+
StartsAt: time.Now(),
1305+
EndsAt: time.Now().Add(time.Hour),
1306+
}
1307+
data, err := json.Marshal(silence)
1308+
require.NoError(t, err)
1309+
1310+
req := httptest.NewRequest(http.MethodPost, externalURL.String()+"/api/v2/silences", bytes.NewReader(data))
1311+
req.Header.Set("content-type", "application/json")
1312+
reqCtx := user.InjectOrgID(req.Context(), userID)
1313+
{
1314+
w := httptest.NewRecorder()
1315+
i.ServeHTTP(w, req.WithContext(reqCtx))
1316+
1317+
resp := w.Result()
1318+
body, _ := ioutil.ReadAll(resp.Body)
1319+
assert.Equal(t, http.StatusOK, w.Code)
1320+
require.Regexp(t, regexp.MustCompile(`{"silenceID":".+"}`), string(body))
1321+
}
1322+
}
1323+
1324+
checkSilence := func(i *MultitenantAlertmanager, userID string) {
1325+
req := httptest.NewRequest(http.MethodGet, externalURL.String()+"/api/v2/silences", nil)
1326+
req.Header.Set("content-type", "application/json")
1327+
reqCtx := user.InjectOrgID(req.Context(), userID)
1328+
{
1329+
w := httptest.NewRecorder()
1330+
i.ServeHTTP(w, req.WithContext(reqCtx))
1331+
1332+
resp := w.Result()
1333+
body, _ := ioutil.ReadAll(resp.Body)
1334+
assert.Equal(t, http.StatusOK, w.Code)
1335+
require.Regexp(t, regexp.MustCompile(`"comment":"Created for a test case."`), string(body))
1336+
}
1337+
}
1338+
1339+
// 1. Create the first instance and load the user configurations.
1340+
i1 := createInstance(1)
1341+
1342+
// 2. Create a silence in the first alertmanager instance and check we can read it.
1343+
writeSilence(i1, "u-1")
1344+
// 2.a. Check the silence was created (paranoia).
1345+
checkSilence(i1, "u-1")
1346+
// 2.b. Check the relevant metrics were updated.
1347+
{
1348+
metrics := registries.BuildMetricFamiliesPerUser()
1349+
assert.Equal(t, float64(1), metrics.GetSumOfGauges("cortex_alertmanager_silences"))
1350+
assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_total"))
1351+
assert.Equal(t, float64(0), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_failed_total"))
1352+
}
1353+
1354+
// 3. Create a second instance. This should attempt to fetch the silence from the first.
1355+
i2 := createInstance(2)
1356+
1357+
// 3.a. Check the silence was fetched from the first instance successfully.
1358+
checkSilence(i2, "u-1")
1359+
1360+
// 3.b. Check the metrics: We should see the additional silences without any replication activity.
1361+
{
1362+
metrics := registries.BuildMetricFamiliesPerUser()
1363+
assert.Equal(t, float64(2), metrics.GetSumOfGauges("cortex_alertmanager_silences"))
1364+
assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_total"))
1365+
assert.Equal(t, float64(0), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_failed_total"))
1366+
}
1367+
1368+
if tt.replicationFactor >= 3 {
1369+
// 4. When testing RF = 3, create a third instance, to test obtaining state from multiple places.
1370+
i3 := createInstance(3)
1371+
1372+
// 4.a. Check the silence was fetched one or both of the instances successfully.
1373+
checkSilence(i3, "u-1")
1374+
1375+
// 4.b. Check the metrics one more time. We should have three replicas of the silence.
1376+
{
1377+
metrics := registries.BuildMetricFamiliesPerUser()
1378+
assert.Equal(t, float64(3), metrics.GetSumOfGauges("cortex_alertmanager_silences"))
1379+
assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_total"))
1380+
assert.Equal(t, float64(0), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_failed_total"))
1381+
}
1382+
}
1383+
})
1384+
}
1385+
}
1386+
12081387
// prepareInMemoryAlertStore builds and returns an in-memory alert store.
12091388
func prepareInMemoryAlertStore() alertstore.AlertStore {
12101389
return bucketclient.NewBucketAlertStore(objstore.NewInMemBucket(), nil, log.NewNopLogger())
@@ -1251,6 +1430,10 @@ func (am *passthroughAlertmanagerClient) UpdateState(ctx context.Context, in *cl
12511430
return am.server.UpdateState(ctx, in)
12521431
}
12531432

1433+
func (am *passthroughAlertmanagerClient) ReadState(ctx context.Context, in *alertmanagerpb.ReadStateRequest, opts ...grpc.CallOption) (*alertmanagerpb.ReadStateResponse, error) {
1434+
return am.server.ReadState(ctx, in)
1435+
}
1436+
12541437
func (am *passthroughAlertmanagerClient) HandleRequest(context.Context, *httpgrpc.HTTPRequest, ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) {
12551438
return nil, fmt.Errorf("unexpected call to HandleRequest")
12561439
}

pkg/ring/replication_set.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,18 @@ func (r ReplicationSet) GetAddresses() []string {
115115
return addrs
116116
}
117117

118+
// GetAddressesWithout returns the addresses of all instances within the replication set while
119+
// excluding the specified addresses.Returned slice order is not guaranteed.
120+
func (r ReplicationSet) GetAddressesWithout(exclude string) []string {
121+
addrs := make([]string, 0, len(r.Instances))
122+
for _, desc := range r.Instances {
123+
if desc.Addr != exclude {
124+
addrs = append(addrs, desc.Addr)
125+
}
126+
}
127+
return addrs
128+
}
129+
118130
// HasReplicationSetChanged returns true if two replications sets are the same (with possibly different timestamps),
119131
// false if they differ in any way (number of instances, instance states, tokens, zones, ...).
120132
func HasReplicationSetChanged(before, after ReplicationSet) bool {

pkg/ring/replication_set_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,48 @@ func TestReplicationSet_GetAddresses(t *testing.T) {
3939
}
4040
}
4141

42+
func TestReplicationSet_GetAddressesWithout(t *testing.T) {
43+
tests := map[string]struct {
44+
rs ReplicationSet
45+
expected []string
46+
exclude string
47+
}{
48+
"should return an empty slice on empty replication set": {
49+
rs: ReplicationSet{},
50+
expected: []string{},
51+
exclude: "127.0.0.1",
52+
},
53+
"non-matching exclusion, should return all addresses": {
54+
rs: ReplicationSet{
55+
Instances: []InstanceDesc{
56+
{Addr: "127.0.0.1"},
57+
{Addr: "127.0.0.2"},
58+
{Addr: "127.0.0.3"},
59+
},
60+
},
61+
expected: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"},
62+
exclude: "127.0.0.4",
63+
},
64+
"matching exclusion, should return non-excluded addresses": {
65+
rs: ReplicationSet{
66+
Instances: []InstanceDesc{
67+
{Addr: "127.0.0.1"},
68+
{Addr: "127.0.0.2"},
69+
{Addr: "127.0.0.3"},
70+
},
71+
},
72+
expected: []string{"127.0.0.1", "127.0.0.3"},
73+
exclude: "127.0.0.2",
74+
},
75+
}
76+
77+
for testName, testData := range tests {
78+
t.Run(testName, func(t *testing.T) {
79+
assert.ElementsMatch(t, testData.expected, testData.rs.GetAddressesWithout(testData.exclude))
80+
})
81+
}
82+
}
83+
4284
var (
4385
errFailure = errors.New("failed")
4486
errZoneFailure = errors.New("zone failed")

0 commit comments

Comments
 (0)