Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement quorum reads and merging for /v2/alerts and /v2/alerts/groups. #4127

Merged
merged 2 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ require (
github.com/felixge/fgprof v0.9.1
github.com/fsouza/fake-gcs-server v1.7.0
github.com/go-kit/kit v0.10.0
github.com/go-openapi/strfmt v0.20.0
github.com/go-openapi/swag v0.19.14
github.com/go-redis/redis/v8 v8.2.3
github.com/gocql/gocql v0.0.0-20200526081602-cd04bd7f22a7
github.com/gogo/protobuf v1.3.2
Expand Down
26 changes: 18 additions & 8 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,21 +505,23 @@ func TestAlertmanagerSharding(t *testing.T) {
// Therefore, the alerts we posted should always be visible.

for _, c := range clients {
list, err := c.GetAlerts(context.Background())
list, err := c.GetAlertsV1(context.Background())
require.NoError(t, err)
assert.ElementsMatch(t, []string{"alert_1", "alert_2", "alert_3"}, alertNames(list))
}
}

// Endpoint: GET /alerts/groups
// Endpoint: GET /v2/alerts
{
// Writes do not block for the write slowest replica, and reads do not
// currently merge results from multiple replicas, so we have to wait.
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(
e2e.Equals(float64(3*testCfg.replicationFactor)),
[]string{"cortex_alertmanager_alerts_received_total"},
e2e.SkipMissingMetrics))
for _, c := range clients {
list, err := c.GetAlertsV2(context.Background())
require.NoError(t, err)
assert.ElementsMatch(t, []string{"alert_1", "alert_2", "alert_3"}, alertNames(list))
}
}

// Endpoint: GET /v2/alerts/groups
{
for _, c := range clients {
list, err := c.GetAlertGroups(context.Background())
require.NoError(t, err)
Expand All @@ -535,7 +537,15 @@ func TestAlertmanagerSharding(t *testing.T) {
require.Contains(t, groups, "group_2")
assert.ElementsMatch(t, []string{"alert_3"}, alertNames(groups["group_2"]))
}

// Note: /v1/alerts/groups does not exist.
}

// Check the alerts were eventually written to every replica.
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(
e2e.Equals(float64(3*testCfg.replicationFactor)),
[]string{"cortex_alertmanager_alerts_received_total"},
e2e.SkipMissingMetrics))
})
}
}
31 changes: 30 additions & 1 deletion integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func (c *Client) SendAlertToAlermanager(ctx context.Context, alert *model.Alert)
return nil
}

func (c *Client) GetAlerts(ctx context.Context) ([]model.Alert, error) {
func (c *Client) GetAlertsV1(ctx context.Context) ([]model.Alert, error) {
u := c.alertmanagerClient.URL("api/prom/api/v1/alerts", nil)

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
Expand Down Expand Up @@ -547,6 +547,35 @@ func (c *Client) GetAlerts(ctx context.Context) ([]model.Alert, error) {
return decoded.Data, nil
}

func (c *Client) GetAlertsV2(ctx context.Context) ([]model.Alert, error) {
u := c.alertmanagerClient.URL("api/prom/api/v2/alerts", nil)

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, fmt.Errorf("error creating request: %v", err)
}

resp, body, err := c.alertmanagerClient.Do(ctx, req)
if err != nil {
return nil, err
}

if resp.StatusCode == http.StatusNotFound {
return nil, ErrNotFound
}

if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("getting alerts failed with status %d and error %v", resp.StatusCode, string(body))
}

decoded := []model.Alert{}
if err := json.Unmarshal(body, &decoded); err != nil {
return nil, err
}

return decoded, nil
}

type AlertGroup struct {
Labels model.LabelSet `json:"labels"`
Alerts []model.Alert `json:"alerts"`
Expand Down
10 changes: 7 additions & 3 deletions pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,17 @@ func (d *Distributor) isQuorumReadPath(p string) (bool, merger.Merger) {
if strings.HasSuffix(p, "/v1/alerts") {
return true, merger.V1Alerts{}
}
if strings.HasSuffix(p, "/v2/alerts") {
return true, merger.V2Alerts{}
}
if strings.HasSuffix(p, "/v2/alerts/groups") {
return true, merger.V2AlertGroups{}
}
return false, nil
}

func (d *Distributor) isUnaryReadPath(p string) bool {
return strings.HasSuffix(p, "/v2/alerts") ||
strings.HasSuffix(p, "/alerts/groups") ||
strings.HasSuffix(p, "/silences") ||
return strings.HasSuffix(p, "/silences") ||
strings.HasSuffix(path.Dir(p), "/silence") ||
strings.HasSuffix(p, "/status") ||
strings.HasSuffix(p, "/receivers")
Expand Down
23 changes: 17 additions & 6 deletions pkg/alertmanager/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,34 @@ func TestDistributor_DistributeRequest(t *testing.T) {
route: "/v1/alerts",
responseBody: []byte(`{"status":"success","data":[]}`),
}, {
name: "Read /v2/alerts is sent to only 1 AM",
name: "Read /v2/alerts is sent to 3 AMs",
numAM: 5,
numHappyAM: 5,
replicationFactor: 3,
isRead: true,
expStatusCode: http.StatusOK,
expectedTotalCalls: 1,
expectedTotalCalls: 3,
route: "/v2/alerts",
responseBody: []byte(`[]`),
}, {
name: "Read /alerts/groups is sent to only 1 AM",
name: "Read /v2/alerts/groups is sent to 3 AMs",
numAM: 5,
numHappyAM: 5,
replicationFactor: 3,
isRead: true,
expStatusCode: http.StatusOK,
expectedTotalCalls: 1,
route: "/alerts/groups",
expectedTotalCalls: 3,
route: "/v2/alerts/groups",
responseBody: []byte(`[]`),
}, {
name: "Read /v1/alerts/groups not supported",
numAM: 5,
numHappyAM: 5,
replicationFactor: 3,
expStatusCode: http.StatusNotFound,
expectedTotalCalls: 0,
headersNotPreserved: true,
route: "/v1/alerts/groups",
}, {
name: "Write /alerts/groups not supported",
numAM: 5,
Expand Down Expand Up @@ -256,7 +267,7 @@ func TestDistributor_DistributeRequest(t *testing.T) {
func TestDistributor_IsPathSupported(t *testing.T) {
supported := map[string]bool{
"/alertmanager/api/v1/alerts": true,
"/alertmanager/api/v1/alerts/groups": true,
"/alertmanager/api/v1/alerts/groups": false,
"/alertmanager/api/v1/silences": true,
"/alertmanager/api/v1/silence/id": true,
"/alertmanager/api/v1/silence/anything": true,
Expand Down
104 changes: 104 additions & 0 deletions pkg/alertmanager/merger/v2_alert_groups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package merger

import (
"errors"
"sort"

"github.com/go-openapi/swag"
v2 "github.com/prometheus/alertmanager/api/v2"
v2_models "github.com/prometheus/alertmanager/api/v2/models"
prom_model "github.com/prometheus/common/model"
)

// V2AlertGroups implements the Merger interface for GET /v2/alerts/groups. It returns
// the union of alert groups over all the responses. When the same alert exists in the same
// group for multiple responses, the instance of that alert with the most recent UpdatedAt
// timestamp is returned in that group within the response.
type V2AlertGroups struct{}

func (V2AlertGroups) MergeResponses(in [][]byte) ([]byte, error) {
groups := make(v2_models.AlertGroups, 0)
for _, body := range in {
parsed := make(v2_models.AlertGroups, 0)
if err := swag.ReadJSON(body, &parsed); err != nil {
return nil, err
}
groups = append(groups, parsed...)
}

merged, err := mergeV2AlertGroups(groups)
if err != nil {
return nil, err
}

return swag.WriteJSON(merged)
}

func mergeV2AlertGroups(in v2_models.AlertGroups) (v2_models.AlertGroups, error) {
// Gather lists of all alerts for each distinct group.
groups := make(map[groupKey]*v2_models.AlertGroup)
for _, group := range in {
if group.Receiver == nil {
return nil, errors.New("unexpected nil receiver")
}
if group.Receiver.Name == nil {
return nil, errors.New("unexpected nil receiver name")
}

key := getGroupKey(group)
if current, ok := groups[key]; ok {
current.Alerts = append(current.Alerts, group.Alerts...)
} else {
groups[key] = group
}
}

// Merge duplicates of the same alert within each group.
for _, group := range groups {
var err error
group.Alerts, err = mergeV2Alerts(group.Alerts)
if err != nil {
return nil, err
}
}

result := make(v2_models.AlertGroups, 0, len(groups))
for _, group := range groups {
result = append(result, group)
}

// Mimic Alertmanager which returns groups ordered by labels and receiver.
sort.Sort(byGroup(result))

return result, nil
}

// getGroupKey returns an identity for a group which can be used to match it against other groups.
// Only the receiver name is necessary to ensure grouping by receiver, and for the labels, we again
// use the same method for matching the group labels as used internally, generating the fingerprint.
func getGroupKey(group *v2_models.AlertGroup) groupKey {
return groupKey{
fingerprint: prom_model.LabelsToSignature(group.Labels),
receiver: *group.Receiver.Name,
}
}

type groupKey struct {
fingerprint uint64
receiver string
}

// byGroup implements the ordering of Alertmanager dispatch.AlertGroups on the OpenAPI type.
type byGroup v2_models.AlertGroups

func (ag byGroup) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
func (ag byGroup) Less(i, j int) bool {
iLabels := v2.APILabelSetToModelLabelSet(ag[i].Labels)
jLabels := v2.APILabelSetToModelLabelSet(ag[j].Labels)

if iLabels.Equal(jLabels) {
return *ag[i].Receiver.Name < *ag[j].Receiver.Name
}
return iLabels.Before(jLabels)
}
func (ag byGroup) Len() int { return len(ag) }
Loading