Skip to content

Commit 2dae12a

Browse files
authored
Alertmanager: Replicate state using the Ring (#3839)
* Alertmanager: Replicate state using the Ring Alertmanager typically uses the memberlist gossip based protocol to replcate state across replicas. In cortex, we used the same fundamentals to provide some sort of high availability mode. Now that we have support for sharding instances across many machines, we can leverage the ring to find the corresponding instances and send the updates via gRPC. Signed-off-by: gotjosh <[email protected]> * Appease the linter and wordsmithing Signed-off-by: gotjosh <[email protected]> * Always wait for the missing metrics Signed-off-by: gotjosh <[email protected]>
1 parent c48532e commit 2dae12a

11 files changed

+1305
-182
lines changed

go.sum

-113
Large diffs are not rendered by default.

integration/alertmanager_test.go

+40-20
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ import (
1111
"testing"
1212
"time"
1313

14+
amlabels "github.com/prometheus/alertmanager/pkg/labels"
15+
"github.com/prometheus/alertmanager/types"
1416
"github.com/prometheus/common/model"
1517
"github.com/prometheus/prometheus/pkg/labels"
18+
"github.com/stretchr/testify/assert"
1619
"github.com/stretchr/testify/require"
1720

1821
"github.com/cortexproject/cortex/integration/e2e"
@@ -290,31 +293,48 @@ func TestAlertmanagerSharding(t *testing.T) {
290293
require.NoError(t, s.StartAndWaitReady(am))
291294
}
292295

293-
for _, am := range alertmanagers.Instances() {
294-
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
295-
labels.MustNewMatcher(labels.MatchEqual, "name", "alertmanager"),
296-
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"),
297-
)))
298-
299-
// We expect every instance to discover every configuration but only own a subset of them.
300-
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(30)), "cortex_alertmanager_tenants_discovered"))
301-
// We know that the ring has settled when every instance has some tenants and the total number of tokens have been assigned.
302-
require.NoError(t, am.WaitSumMetrics(e2e.Greater(float64(0)), "cortex_alertmanager_tenants_owned"))
303-
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(384)), "cortex_ring_tokens_total"))
296+
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(e2e.Equals(9), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
297+
labels.MustNewMatcher(labels.MatchEqual, "name", "alertmanager"),
298+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"),
299+
)))
300+
301+
// We expect every instance to discover every configuration but only own a subset of them.
302+
require.NoError(t, alertmanagers.WaitSumMetrics(e2e.Equals(90), "cortex_alertmanager_tenants_discovered"))
303+
// We know that the ring has settled when every instance has some tenants and the total number of tokens have been assigned.
304+
// The total number of tenants across all instances is: total alertmanager configs * replication factor.
305+
// In this case: 30 * 2
306+
require.NoError(t, alertmanagers.WaitSumMetrics(e2e.Equals(60), "cortex_alertmanager_tenants_owned"))
307+
require.NoError(t, alertmanagers.WaitSumMetrics(e2e.Equals(float64(1152)), "cortex_ring_tokens_total"))
308+
309+
// Now, let's make sure state is replicated across instances.
310+
// 1. Let's select a random tenant
311+
userID := "user-5"
312+
313+
// 2. Let's create a silence
314+
silence := types.Silence{
315+
Matchers: amlabels.Matchers{
316+
{Name: "instance", Value: "prometheus-one"},
317+
},
318+
Comment: "Created for a test case.",
319+
StartsAt: time.Now(),
320+
EndsAt: time.Now().Add(time.Hour),
304321
}
305322

306-
var totalTenants int
307-
for _, am := range alertmanagers.Instances() {
308-
values, err := am.SumMetrics([]string{"cortex_alertmanager_tenants_owned"})
309-
require.NoError(t, err)
323+
// 2b. For each tenant, with a replication factor of 2 and 3 instances there's a chance the user might not be in the first selected replica.
324+
c1, err := e2ecortex.NewClient("", "", alertmanager1.HTTPEndpoint(), "", userID)
325+
require.NoError(t, err)
326+
c2, err := e2ecortex.NewClient("", "", alertmanager2.HTTPEndpoint(), "", userID)
327+
require.NoError(t, err)
310328

311-
tenants := int(e2e.SumValues(values))
312-
totalTenants += tenants
329+
err = c1.CreateSilence(context.Background(), silence)
330+
if err != nil {
331+
err := c2.CreateSilence(context.Background(), silence)
332+
require.NoError(t, err)
333+
} else {
334+
require.NoError(t, err)
313335
}
314336

315-
// The total number of tenants across all instances is: total alertmanager configs * replication factor.
316-
// In this case: 30 * 2
317-
require.Equal(t, 60, totalTenants)
337+
assert.NoError(t, alertmanagers.WaitSumMetricsWithOptions(e2e.Equals(float64(2)), []string{"cortex_alertmanager_silences"}), e2e.WaitMissingMetrics)
318338
})
319339
}
320340
}

integration/e2ecortex/client.go

+25
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,31 @@ func (c *Client) SendAlertToAlermanager(ctx context.Context, alert *model.Alert)
447447
return nil
448448
}
449449

450+
func (c *Client) CreateSilence(ctx context.Context, silence types.Silence) error {
451+
u := c.alertmanagerClient.URL("api/prom/api/v1/silences", nil)
452+
453+
data, err := json.Marshal(silence)
454+
if err != nil {
455+
return fmt.Errorf("error marshaling the silence: %s", err)
456+
}
457+
458+
req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(data))
459+
if err != nil {
460+
return fmt.Errorf("error creating request: %v", err)
461+
}
462+
463+
resp, body, err := c.alertmanagerClient.Do(ctx, req)
464+
if err != nil {
465+
return err
466+
}
467+
468+
if resp.StatusCode != http.StatusOK {
469+
return fmt.Errorf("creating the silence failed with status %d and error %v", resp.StatusCode, string(body))
470+
}
471+
472+
return nil
473+
}
474+
450475
func (c *Client) PostRequest(url string, body io.Reader) (*http.Response, error) {
451476
req, err := http.NewRequest("POST", url, body)
452477
if err != nil {

pkg/alertmanager/alertmanager.go

+54-15
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ import (
1414
"time"
1515

1616
"github.com/go-kit/kit/log"
17+
"github.com/go-kit/kit/log/level"
1718
"github.com/prometheus/alertmanager/api"
1819
"github.com/prometheus/alertmanager/cluster"
20+
"github.com/prometheus/alertmanager/cluster/clusterpb"
1921
"github.com/prometheus/alertmanager/config"
2022
"github.com/prometheus/alertmanager/dispatch"
2123
"github.com/prometheus/alertmanager/inhibit"
@@ -53,13 +55,21 @@ type Config struct {
5355
PeerTimeout time.Duration
5456
Retention time.Duration
5557
ExternalURL *url.URL
58+
59+
ShardingEnabled bool
60+
ReplicationFactor int
61+
ReplicateStateFunc func(context.Context, string, *clusterpb.Part) error
62+
// The alertmanager replication protocol relies on a position related to other replicas.
63+
// This position is then used to identify who should notify about the alert first.
64+
GetPositionFunc func(userID string) int
5665
}
5766

5867
// An Alertmanager manages the alerts for one user.
5968
type Alertmanager struct {
6069
cfg *Config
6170
api *api.API
6271
logger log.Logger
72+
state State
6373
nflog *nflog.Log
6474
silences *silence.Silences
6575
marker types.Marker
@@ -96,6 +106,13 @@ func init() {
96106
}()
97107
}
98108

109+
// State helps with replication and synchronization of notifications and silences across several alertmanager replicas.
110+
type State interface {
111+
AddState(string, cluster.State, prometheus.Registerer) cluster.ClusterChannel
112+
Position() int
113+
WaitReady()
114+
}
115+
99116
// New creates a new Alertmanager.
100117
func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
101118
am := &Alertmanager{
@@ -110,6 +127,22 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
110127

111128
am.registry = reg
112129

130+
// We currently have 3 operational modes:
131+
// 1) Alertmanager clustering with upstream Gossip
132+
// 2) Alertmanager sharding and ring-based replication
133+
// 3) Alertmanager no replication
134+
// These are covered in order.
135+
if cfg.Peer != nil {
136+
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with gossip-based replication")
137+
am.state = cfg.Peer
138+
} else if cfg.ShardingEnabled {
139+
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
140+
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.ReplicateStateFunc, cfg.GetPositionFunc, am.stop, am.logger, am.registry)
141+
} else {
142+
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
143+
am.state = &NilPeer{}
144+
}
145+
113146
am.wg.Add(1)
114147
nflogID := fmt.Sprintf("nflog:%s", cfg.UserID)
115148
var err error
@@ -123,10 +156,9 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
123156
if err != nil {
124157
return nil, fmt.Errorf("failed to create notification log: %v", err)
125158
}
126-
if cfg.Peer != nil {
127-
c := cfg.Peer.AddState("nfl:"+cfg.UserID, am.nflog, am.registry)
128-
am.nflog.SetBroadcast(c.Broadcast)
129-
}
159+
160+
c := am.state.AddState("nfl:"+cfg.UserID, am.nflog, am.registry)
161+
am.nflog.SetBroadcast(c.Broadcast)
130162

131163
am.marker = types.NewMarker(am.registry)
132164

@@ -140,10 +172,9 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
140172
if err != nil {
141173
return nil, fmt.Errorf("failed to create silences: %v", err)
142174
}
143-
if cfg.Peer != nil {
144-
c := cfg.Peer.AddState("sil:"+cfg.UserID, am.silences, am.registry)
145-
am.silences.SetBroadcast(c.Broadcast)
146-
}
175+
176+
c = am.state.AddState("sil:"+cfg.UserID, am.silences, am.registry)
177+
am.silences.SetBroadcast(c.Broadcast)
147178

148179
am.pipelineBuilder = notify.NewPipelineBuilder(am.registry)
149180

@@ -162,9 +193,10 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
162193
Alerts: am.alerts,
163194
Silences: am.silences,
164195
StatusFunc: am.marker.Status,
165-
Peer: &NilPeer{},
166-
Registry: am.registry,
167-
Logger: log.With(am.logger, "component", "api"),
196+
// Cortex should not expose cluster information back to its tenants.
197+
Peer: &NilPeer{},
198+
Registry: am.registry,
199+
Logger: log.With(am.logger, "component", "api"),
168200
GroupFunc: func(f1 func(*dispatch.Route) bool, f2 func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) {
169201
return am.dispatcher.Groups(f1, f2)
170202
},
@@ -190,14 +222,16 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
190222
}
191223

192224
am.dispatcherMetrics = dispatch.NewDispatcherMetrics(am.registry)
225+
226+
//TODO: From this point onward, the alertmanager _might_ receive requests - we need to make sure we've settled and are ready.
193227
return am, nil
194228
}
195229

196230
// clusterWait returns a function that inspects the current peer state and returns
197231
// a duration of one base timeout for each peer with a higher ID than ourselves.
198-
func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration {
232+
func clusterWait(position func() int, timeout time.Duration) func() time.Duration {
199233
return func() time.Duration {
200-
return time.Duration(p.Position()) * timeout
234+
return time.Duration(position()) * timeout
201235
}
202236
}
203237

@@ -230,7 +264,8 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
230264

231265
am.inhibitor = inhibit.NewInhibitor(am.alerts, conf.InhibitRules, am.marker, log.With(am.logger, "component", "inhibitor"))
232266

233-
waitFunc := clusterWait(am.cfg.Peer, am.cfg.PeerTimeout)
267+
waitFunc := clusterWait(am.state.Position, am.cfg.PeerTimeout)
268+
234269
timeoutFunc := func(d time.Duration) time.Duration {
235270
if d < notify.MinTimeout {
236271
d = notify.MinTimeout
@@ -255,7 +290,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
255290
silence.NewSilencer(am.silences, am.marker, am.logger),
256291
muteTimes,
257292
am.nflog,
258-
am.cfg.Peer,
293+
am.state,
259294
)
260295
am.dispatcher = dispatch.NewDispatcher(
261296
am.alerts,
@@ -293,6 +328,10 @@ func (am *Alertmanager) StopAndWait() {
293328
am.wg.Wait()
294329
}
295330

331+
func (am *Alertmanager) mergePartialExternalState(part *clusterpb.Part) error {
332+
return am.state.(*state).MergePartialState(part)
333+
}
334+
296335
// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a
297336
// list of receiver config.
298337
func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, logger log.Logger) (map[string][]notify.Integration, error) {

pkg/alertmanager/alertmanager_metrics.go

+31-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ type alertmanagerMetrics struct {
4646

4747
// The alertmanager config hash.
4848
configHashValue *prometheus.Desc
49+
50+
partialMerges *prometheus.Desc
51+
partialMergesFailed *prometheus.Desc
52+
replicationTotal *prometheus.Desc
53+
replicationFailed *prometheus.Desc
4954
}
5055

5156
func newAlertmanagerMetrics() *alertmanagerMetrics {
@@ -147,6 +152,22 @@ func newAlertmanagerMetrics() *alertmanagerMetrics {
147152
"cortex_alertmanager_config_hash",
148153
"Hash of the currently loaded alertmanager configuration.",
149154
[]string{"user"}, nil),
155+
partialMerges: prometheus.NewDesc(
156+
"cortex_alertmanager_partial_state_merges_total",
157+
"Number of times we have received a partial state to merge for a key.",
158+
[]string{"key"}, nil),
159+
partialMergesFailed: prometheus.NewDesc(
160+
"cortex_alertmanager_partial_state_merges_failed_total",
161+
"Number of times we have failed to merge a partial state received for a key.",
162+
[]string{"key"}, nil),
163+
replicationTotal: prometheus.NewDesc(
164+
"cortex_alertmanager_state_replication_total",
165+
"Number of times we have tried to replicate a state to other alertmanagers",
166+
[]string{"key"}, nil),
167+
replicationFailed: prometheus.NewDesc(
168+
"cortex_alertmanager_state_replication_failed_total",
169+
"Number of times we have failed to replicate a state to other alertmanagers",
170+
[]string{"key"}, nil),
150171
}
151172
}
152173

@@ -155,7 +176,7 @@ func (m *alertmanagerMetrics) addUserRegistry(user string, reg *prometheus.Regis
155176
}
156177

157178
func (m *alertmanagerMetrics) removeUserRegistry(user string) {
158-
// We neeed to go for a soft deletion here, as hard deletion requires
179+
// We need to go for a soft deletion here, as hard deletion requires
159180
// that _all_ metrics except gauges are per-user.
160181
m.regs.RemoveUserRegistry(user, false)
161182
}
@@ -185,6 +206,10 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
185206
out <- m.silencesPropagatedMessagesTotal
186207
out <- m.silences
187208
out <- m.configHashValue
209+
out <- m.partialMerges
210+
out <- m.partialMergesFailed
211+
out <- m.replicationTotal
212+
out <- m.replicationFailed
188213
}
189214

190215
func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
@@ -218,4 +243,9 @@ func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
218243
data.SendSumOfGaugesPerUserWithLabels(out, m.silences, "alertmanager_silences", "state")
219244

220245
data.SendMaxOfGaugesPerUser(out, m.configHashValue, "alertmanager_config_hash")
246+
247+
data.SendSumOfCountersWithLabels(out, m.partialMerges, "alertmanager_partial_state_merges_total", "key")
248+
data.SendSumOfCountersWithLabels(out, m.partialMergesFailed, "alertmanager_partial_state_merges_failed_total", "key")
249+
data.SendSumOfCountersWithLabels(out, m.replicationTotal, "alertmanager_state_replication_total", "key")
250+
data.SendSumOfCountersWithLabels(out, m.replicationFailed, "alertmanager_state_replication_failed_total", "key")
221251
}

0 commit comments

Comments
 (0)