Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First cut at metrics for alertmanager sharding operation. #4149

Merged
merged 6 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
* [CHANGE] Alertmanager: allowed to configure the experimental receivers firewall on a per-tenant basis. The following CLI flags (and their respective YAML config options) have been changed and moved to the limits config section: #4143
- `-alertmanager.receivers-firewall.block.cidr-networks` renamed to `-alertmanager.receivers-firewall-block-cidr-networks`
- `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses`
* [ENHANCEMENT] Alertmanager: introduced new metrics to monitor operation when using `-alertmanager.sharding-enabled`: #4149
* `cortex_alertmanager_state_fetch_replica_state_total`
* `cortex_alertmanager_state_fetch_replica_state_failed_total`
* `cortex_alertmanager_state_initial_sync_total`
* `cortex_alertmanager_state_initial_sync_completed_total`
* `cortex_alertmanager_state_initial_sync_duration_seconds`
* `cortex_alertmanager_state_persist_total`
* `cortex_alertmanager_state_persist_failed_total`

## 1.9.0 in progress

Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
state := newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, cfg.Store, am.logger, am.registry)
am.state = state
am.persister = newStatePersister(cfg.PersisterConfig, cfg.UserID, state, cfg.Store, am.logger)
am.persister = newStatePersister(cfg.PersisterConfig, cfg.UserID, state, cfg.Store, am.logger, am.registry)
} else {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
am.state = &NilPeer{}
Expand Down
57 changes: 53 additions & 4 deletions pkg/alertmanager/alertmanager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,17 @@ type alertmanagerMetrics struct {
// The alertmanager config hash.
configHashValue *prometheus.Desc

partialMerges *prometheus.Desc
partialMergesFailed *prometheus.Desc
replicationTotal *prometheus.Desc
replicationFailed *prometheus.Desc
partialMerges *prometheus.Desc
partialMergesFailed *prometheus.Desc
replicationTotal *prometheus.Desc
replicationFailed *prometheus.Desc
fetchReplicaStateTotal *prometheus.Desc
fetchReplicaStateFailed *prometheus.Desc
initialSyncTotal *prometheus.Desc
initialSyncCompleted *prometheus.Desc
initialSyncDuration *prometheus.Desc
persistTotal *prometheus.Desc
persistFailed *prometheus.Desc
}

func newAlertmanagerMetrics() *alertmanagerMetrics {
Expand Down Expand Up @@ -168,6 +175,34 @@ func newAlertmanagerMetrics() *alertmanagerMetrics {
"cortex_alertmanager_state_replication_failed_total",
"Number of times we have failed to replicate a state to other alertmanagers",
[]string{"user"}, nil),
fetchReplicaStateTotal: prometheus.NewDesc(
"cortex_alertmanager_state_fetch_replica_state_total",
"Number of times we have tried to read and merge the full state from another replica.",
nil, nil),
fetchReplicaStateFailed: prometheus.NewDesc(
"cortex_alertmanager_state_fetch_replica_state_failed_total",
"Number of times we have failed to read and merge the full state from another replica.",
nil, nil),
initialSyncTotal: prometheus.NewDesc(
"cortex_alertmanager_state_initial_sync_total",
"Number of times we have tried to sync initial state from peers or storage.",
nil, nil),
initialSyncCompleted: prometheus.NewDesc(
"cortex_alertmanager_state_initial_sync_completed_total",
"Number of times we have completed syncing initial state for each possible outcome.",
[]string{"outcome"}, nil),
initialSyncDuration: prometheus.NewDesc(
"cortex_alertmanager_state_initial_sync_duration_seconds",
"Time spent syncing initial state from peers or storage.",
nil, nil),
persistTotal: prometheus.NewDesc(
"cortex_alertmanager_state_persist_total",
"Number of times we have tried to persist the running state to storage.",
nil, nil),
persistFailed: prometheus.NewDesc(
"cortex_alertmanager_state_persist_failed_total",
"Number of times we have failed to persist the running state to storage.",
nil, nil),
}
}

Expand Down Expand Up @@ -210,6 +245,13 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
out <- m.partialMergesFailed
out <- m.replicationTotal
out <- m.replicationFailed
out <- m.fetchReplicaStateTotal
out <- m.fetchReplicaStateFailed
out <- m.initialSyncTotal
out <- m.initialSyncCompleted
out <- m.initialSyncDuration
out <- m.persistTotal
out <- m.persistFailed
}

func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
Expand Down Expand Up @@ -248,4 +290,11 @@ func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfCountersPerUser(out, m.partialMergesFailed, "alertmanager_partial_state_merges_failed_total")
data.SendSumOfCountersPerUser(out, m.replicationTotal, "alertmanager_state_replication_total")
data.SendSumOfCountersPerUser(out, m.replicationFailed, "alertmanager_state_replication_failed_total")
data.SendSumOfCounters(out, m.fetchReplicaStateTotal, "alertmanager_state_fetch_replica_state_total")
data.SendSumOfCounters(out, m.fetchReplicaStateFailed, "alertmanager_state_fetch_replica_state_failed_total")
data.SendSumOfCounters(out, m.initialSyncTotal, "alertmanager_state_initial_sync_total")
data.SendSumOfCountersWithLabels(out, m.initialSyncCompleted, "alertmanager_state_initial_sync_completed_total", "outcome")
data.SendSumOfHistograms(out, m.initialSyncDuration, "alertmanager_state_initial_sync_duration_seconds")
data.SendSumOfCounters(out, m.persistTotal, "alertmanager_state_persist_total")
data.SendSumOfCounters(out, m.persistFailed, "alertmanager_state_persist_failed_total")
}
61 changes: 61 additions & 0 deletions pkg/alertmanager/alertmanager_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,26 @@ func TestAlertmanagerMetricsStore(t *testing.T) {
# HELP cortex_alertmanager_silences_snapshot_size_bytes Size of the last silence snapshot in bytes.
# TYPE cortex_alertmanager_silences_snapshot_size_bytes gauge
cortex_alertmanager_silences_snapshot_size_bytes 111
# HELP cortex_alertmanager_state_fetch_replica_state_failed_total Number of times we have failed to read and merge the full state from another replica.
# TYPE cortex_alertmanager_state_fetch_replica_state_failed_total counter
cortex_alertmanager_state_fetch_replica_state_failed_total 0
# HELP cortex_alertmanager_state_fetch_replica_state_total Number of times we have tried to read and merge the full state from another replica.
# TYPE cortex_alertmanager_state_fetch_replica_state_total counter
cortex_alertmanager_state_fetch_replica_state_total 0
# HELP cortex_alertmanager_state_initial_sync_duration_seconds Time spent syncing initial state from peers or storage.
# TYPE cortex_alertmanager_state_initial_sync_duration_seconds histogram
cortex_alertmanager_state_initial_sync_duration_seconds_bucket{le="+Inf"} 0
cortex_alertmanager_state_initial_sync_duration_seconds_sum 0
cortex_alertmanager_state_initial_sync_duration_seconds_count 0
# HELP cortex_alertmanager_state_initial_sync_total Number of times we have tried to sync initial state from peers or storage.
# TYPE cortex_alertmanager_state_initial_sync_total counter
cortex_alertmanager_state_initial_sync_total 0
# HELP cortex_alertmanager_state_persist_failed_total Number of times we have failed to persist the running state to storage.
# TYPE cortex_alertmanager_state_persist_failed_total counter
cortex_alertmanager_state_persist_failed_total 0
# HELP cortex_alertmanager_state_persist_total Number of times we have tried to persist the running state to storage.
# TYPE cortex_alertmanager_state_persist_total counter
cortex_alertmanager_state_persist_total 0
`))
require.NoError(t, err)
}
Expand Down Expand Up @@ -517,6 +537,26 @@ func TestAlertmanagerMetricsRemoval(t *testing.T) {
# HELP cortex_alertmanager_silences_snapshot_size_bytes Size of the last silence snapshot in bytes.
# TYPE cortex_alertmanager_silences_snapshot_size_bytes gauge
cortex_alertmanager_silences_snapshot_size_bytes 111
# HELP cortex_alertmanager_state_fetch_replica_state_failed_total Number of times we have failed to read and merge the full state from another replica.
# TYPE cortex_alertmanager_state_fetch_replica_state_failed_total counter
cortex_alertmanager_state_fetch_replica_state_failed_total 0
# HELP cortex_alertmanager_state_fetch_replica_state_total Number of times we have tried to read and merge the full state from another replica.
# TYPE cortex_alertmanager_state_fetch_replica_state_total counter
cortex_alertmanager_state_fetch_replica_state_total 0
# HELP cortex_alertmanager_state_initial_sync_duration_seconds Time spent syncing initial state from peers or storage.
# TYPE cortex_alertmanager_state_initial_sync_duration_seconds histogram
cortex_alertmanager_state_initial_sync_duration_seconds_bucket{le="+Inf"} 0
cortex_alertmanager_state_initial_sync_duration_seconds_sum 0
cortex_alertmanager_state_initial_sync_duration_seconds_count 0
# HELP cortex_alertmanager_state_initial_sync_total Number of times we have tried to sync initial state from peers or storage.
# TYPE cortex_alertmanager_state_initial_sync_total counter
cortex_alertmanager_state_initial_sync_total 0
# HELP cortex_alertmanager_state_persist_failed_total Number of times we have failed to persist the running state to storage.
# TYPE cortex_alertmanager_state_persist_failed_total counter
cortex_alertmanager_state_persist_failed_total 0
# HELP cortex_alertmanager_state_persist_total Number of times we have tried to persist the running state to storage.
# TYPE cortex_alertmanager_state_persist_total counter
cortex_alertmanager_state_persist_total 0
`))
require.NoError(t, err)

Expand Down Expand Up @@ -727,6 +767,27 @@ func TestAlertmanagerMetricsRemoval(t *testing.T) {
# HELP cortex_alertmanager_silences_snapshot_size_bytes Size of the last silence snapshot in bytes.
# TYPE cortex_alertmanager_silences_snapshot_size_bytes gauge
cortex_alertmanager_silences_snapshot_size_bytes 11

# HELP cortex_alertmanager_state_fetch_replica_state_failed_total Number of times we have failed to read and merge the full state from another replica.
# TYPE cortex_alertmanager_state_fetch_replica_state_failed_total counter
cortex_alertmanager_state_fetch_replica_state_failed_total 0
# HELP cortex_alertmanager_state_fetch_replica_state_total Number of times we have tried to read and merge the full state from another replica.
# TYPE cortex_alertmanager_state_fetch_replica_state_total counter
cortex_alertmanager_state_fetch_replica_state_total 0
# HELP cortex_alertmanager_state_initial_sync_duration_seconds Time spent syncing initial state from peers or storage.
# TYPE cortex_alertmanager_state_initial_sync_duration_seconds histogram
cortex_alertmanager_state_initial_sync_duration_seconds_bucket{le="+Inf"} 0
cortex_alertmanager_state_initial_sync_duration_seconds_sum 0
cortex_alertmanager_state_initial_sync_duration_seconds_count 0
# HELP cortex_alertmanager_state_initial_sync_total Number of times we have tried to sync initial state from peers or storage.
# TYPE cortex_alertmanager_state_initial_sync_total counter
cortex_alertmanager_state_initial_sync_total 0
# HELP cortex_alertmanager_state_persist_failed_total Number of times we have failed to persist the running state to storage.
# TYPE cortex_alertmanager_state_persist_failed_total counter
cortex_alertmanager_state_persist_failed_total 0
# HELP cortex_alertmanager_state_persist_total Number of times we have tried to persist the running state to storage.
# TYPE cortex_alertmanager_state_persist_total counter
cortex_alertmanager_state_persist_total 0
`))
require.NoError(t, err)
}
Expand Down
29 changes: 25 additions & 4 deletions pkg/alertmanager/state_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
Expand Down Expand Up @@ -53,17 +55,28 @@ type statePersister struct {
logger log.Logger

timeout time.Duration

persistTotal prometheus.Counter
persistFailed prometheus.Counter
}

// newStatePersister creates a new state persister.
func newStatePersister(cfg PersisterConfig, userID string, state PersistableState, store alertstore.AlertStore, l log.Logger) *statePersister {
func newStatePersister(cfg PersisterConfig, userID string, state PersistableState, store alertstore.AlertStore, l log.Logger, r prometheus.Registerer) *statePersister {

s := &statePersister{
state: state,
store: store,
userID: userID,
logger: l,
timeout: defaultPersistTimeout,
persistTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "alertmanager_state_persist_total",
Help: "Number of times we have tried to persist the running state to remote storage.",
}),
persistFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "alertmanager_state_persist_failed_total",
Help: "Number of times we have failed to persist the running state to remote storage.",
}),
}

s.Service = services.NewTimerService(cfg.Interval, s.starting, s.iteration, nil)
Expand All @@ -84,15 +97,23 @@ func (s *statePersister) iteration(ctx context.Context) error {
return nil
}

func (s *statePersister) persist(ctx context.Context) error {
func (s *statePersister) persist(ctx context.Context) (err error) {
// Only the replica at position zero should write the state.
if s.state.Position() != 0 {
return nil
}

s.persistTotal.Inc()
defer func() {
if err != nil {
s.persistFailed.Inc()
}
}()

level.Debug(s.logger).Log("msg", "persisting state", "user", s.userID)

fs, err := s.state.GetFullState()
var fs *clusterpb.FullState
fs, err = s.state.GetFullState()
if err != nil {
return err
}
Expand All @@ -101,7 +122,7 @@ func (s *statePersister) persist(ctx context.Context) error {
defer cancel()

desc := alertspb.FullStateDesc{State: fs}
if err := s.store.SetFullState(ctx, s.userID, desc); err != nil {
if err = s.store.SetFullState(ctx, s.userID, desc); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/state_persister_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func makeTestStatePersister(t *testing.T, position int, userID string) (*fakePer
store := &fakeStore{}
cfg := PersisterConfig{Interval: 1 * time.Second}

s := newStatePersister(cfg, userID, state, store, log.NewNopLogger())
s := newStatePersister(cfg, userID, state, store, log.NewNopLogger(), nil)

require.NoError(t, s.StartAsync(context.Background()))
t.Cleanup(func() {
Expand Down
46 changes: 46 additions & 0 deletions pkg/alertmanager/state_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ import (
const (
defaultSettleReadTimeout = 15 * time.Second
defaultStoreReadTimeout = 15 * time.Second

// Initial sync outcome label values.
syncFromReplica = "from-replica"
syncFromStorage = "from-storage"
syncUserNotFound = "user-not-found"
syncFailed = "failed"
)

// state represents the Alertmanager silences and notification log internal state.
Expand All @@ -47,6 +53,11 @@ type state struct {
partialStateMergesFailed *prometheus.CounterVec
stateReplicationTotal *prometheus.CounterVec
stateReplicationFailed *prometheus.CounterVec
fetchReplicaStateTotal prometheus.Counter
fetchReplicaStateFailed prometheus.Counter
initialSyncTotal prometheus.Counter
initialSyncCompleted *prometheus.CounterVec
initialSyncDuration prometheus.Histogram

msgc chan *clusterpb.Part
}
Expand Down Expand Up @@ -81,7 +92,32 @@ func newReplicatedStates(userID string, rf int, re Replicator, st alertstore.Ale
Name: "alertmanager_state_replication_failed_total",
Help: "Number of times we have failed to replicate a state to other alertmanagers.",
}, []string{"key"}),
fetchReplicaStateTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "alertmanager_state_fetch_replica_state_total",
Help: "Number of times we have tried to read and merge the full state from another replica.",
}),
fetchReplicaStateFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "alertmanager_state_fetch_replica_state_failed_total",
Help: "Number of times we have failed to read and merge the full state from another replica.",
}),
initialSyncTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "alertmanager_state_initial_sync_total",
Help: "Number of times we have tried to sync initial state from peers or remote storage.",
}),
initialSyncCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "alertmanager_state_initial_sync_completed_total",
Help: "Number of times we have completed syncing initial state for each possible outcome.",
}, []string{"outcome"}),
initialSyncDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "alertmanager_state_initial_sync_duration_seconds",
Help: "Time spent syncing initial state from peers or remote storage.",
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
}),
}
s.initialSyncCompleted.WithLabelValues(syncFromReplica)
s.initialSyncCompleted.WithLabelValues(syncFromStorage)
s.initialSyncCompleted.WithLabelValues(syncUserNotFound)
s.initialSyncCompleted.WithLabelValues(syncFailed)

s.Service = services.NewBasicService(s.starting, s.running, nil)

Expand Down Expand Up @@ -154,6 +190,10 @@ func (s *state) GetFullState() (*clusterpb.FullState, error) {
// starting waits until the alertmanagers are ready (and sets the appropriate internal state when it is).
// The idea is that we don't want to start working" before we get a chance to know most of the notifications and/or silences.
func (s *state) starting(ctx context.Context) error {
s.initialSyncTotal.Inc()
timer := prometheus.NewTimer(s.initialSyncDuration)
defer timer.ObserveDuration()

level.Info(s.logger).Log("msg", "Waiting for notification and silences to settle...")

// If the replication factor is <= 1, there is nowhere to obtain the state from.
Expand All @@ -166,13 +206,16 @@ func (s *state) starting(ctx context.Context) error {
readCtx, cancel := context.WithTimeout(ctx, s.settleReadTimeout)
defer cancel()

s.fetchReplicaStateTotal.Inc()
fullStates, err := s.replicator.ReadFullStateForUser(readCtx, s.userID)
if err == nil {
if err = s.mergeFullStates(fullStates); err == nil {
level.Info(s.logger).Log("msg", "state settled; proceeding")
s.initialSyncCompleted.WithLabelValues(syncFromReplica).Inc()
return nil
}
}
s.fetchReplicaStateFailed.Inc()

level.Info(s.logger).Log("msg", "state not settled; trying to read from storage", "err", err)

Expand All @@ -183,16 +226,19 @@ func (s *state) starting(ctx context.Context) error {
fullState, err := s.store.GetFullState(storeReadCtx, s.userID)
if errors.Is(err, alertspb.ErrNotFound) {
level.Info(s.logger).Log("msg", "no state for user in storage; proceeding", "user", s.userID)
s.initialSyncCompleted.WithLabelValues(syncUserNotFound).Inc()
return nil
}
if err == nil {
if err = s.mergeFullStates([]*clusterpb.FullState{fullState.State}); err == nil {
level.Info(s.logger).Log("msg", "state read from storage; proceeding")
s.initialSyncCompleted.WithLabelValues(syncFromStorage).Inc()
return nil
}
}

level.Warn(s.logger).Log("msg", "failed to read state from storage; continuing anyway", "err", err)
s.initialSyncCompleted.WithLabelValues(syncFailed).Inc()

return nil
}
Expand Down
Loading