-
Notifications
You must be signed in to change notification settings - Fork 823
Implement gRPC based initial state settling in alertmanager. #3925
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
bc41a4c
19f71ae
aed52b3
77afe99
069b432
2bdac61
282744a
4c8e4fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -129,6 +129,8 @@ type Replicator interface { | |
// The alertmanager replication protocol relies on a position related to other replicas. | ||
// This position is then used to identify who should notify about the alert first. | ||
GetPositionForUser(userID string) int | ||
// ReadFullStateForUser obtains the full state from cluster peers. | ||
ReadFullStateForUser(context.Context, string) ([]*clusterpb.FullState, error) | ||
} | ||
|
||
// New creates a new Alertmanager. | ||
|
@@ -159,13 +161,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { | |
am.state = cfg.Peer | ||
} else if cfg.ShardingEnabled { | ||
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication") | ||
state := newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry) | ||
|
||
if err := state.Service.StartAsync(context.Background()); err != nil { | ||
return nil, errors.Wrap(err, "failed to start ring-based replication service") | ||
} | ||
|
||
am.state = state | ||
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry) | ||
} else { | ||
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication") | ||
am.state = &NilPeer{} | ||
|
@@ -203,6 +199,13 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { | |
c = am.state.AddState("sil:"+cfg.UserID, am.silences, am.registry) | ||
am.silences.SetBroadcast(c.Broadcast) | ||
|
||
// State replication needs to be started after the state keys are defined. | ||
if service, ok := am.state.(services.Service); ok { | ||
if err := service.StartAsync(context.Background()); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't wait until started here (and it's correct). However, this means that we may start using this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have been assuming we want to settle in the background, because an Perhaps safer to change it to wait for now, and explore doing it in the background as a separate piece of work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's worth noting that "start using" is not as one would think. Yes, we'll accept alerts, silences, etc. However, we'll wait for the state to be replicated before we send a notification. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will leave as-is. My (current) understanding is as Josh said - there is no requirement to block (except for notifications, which are blocked via the call into |
||
return nil, errors.Wrap(err, "failed to start ring-based replication service") | ||
} | ||
} | ||
|
||
am.pipelineBuilder = notify.NewPipelineBuilder(am.registry) | ||
|
||
am.wg.Add(1) | ||
|
@@ -373,6 +376,13 @@ func (am *Alertmanager) mergePartialExternalState(part *clusterpb.Part) error { | |
return errors.New("ring-based sharding not enabled") | ||
} | ||
|
||
func (am *Alertmanager) getFullState() (*clusterpb.FullState, error) { | ||
if state, ok := am.state.(*state); ok { | ||
return state.GetFullState() | ||
} | ||
return nil, errors.New("ring-based sharding not enabled") | ||
} | ||
|
||
// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a | ||
// list of receiver config. | ||
func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, logger log.Logger) (map[string][]notify.Integration, error) { | ||
|
Uh oh!
There was an error while loading. Please reload this page.