Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement quorum reads and response merging for /v1/alerts. #4126

Merged
merged 2 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,17 +497,13 @@ func TestAlertmanagerSharding(t *testing.T) {
require.NoError(t, err)
err = c3.SendAlertToAlermanager(context.Background(), alert(3, 2))
require.NoError(t, err)

// API does not block for the write slowest replica, and reads do not
// currently merge results from multiple replicas, so we have to wait.
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(
e2e.Equals(float64(3*testCfg.replicationFactor)),
[]string{"cortex_alertmanager_alerts_received_total"},
e2e.SkipMissingMetrics))
}

// Endpoint: GET /alerts
{
// Reads will query at least two replicas and merge the results.
// Therefore, the alerts we posted should always be visible.

for _, c := range clients {
list, err := c.GetAlerts(context.Background())
require.NoError(t, err)
Expand All @@ -517,6 +513,13 @@ func TestAlertmanagerSharding(t *testing.T) {

// Endpoint: GET /alerts/groups
{
// Writes do not block for the write slowest replica, and reads do not
// currently merge results from multiple replicas, so we have to wait.
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(
e2e.Equals(float64(3*testCfg.replicationFactor)),
[]string{"cortex_alertmanager_alerts_received_total"},
e2e.SkipMissingMetrics))

for _, c := range clients {
list, err := c.GetAlertGroups(context.Background())
require.NoError(t, err)
Expand Down
71 changes: 54 additions & 17 deletions pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/alertmanager/merger"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/tenant"
Expand Down Expand Up @@ -67,7 +68,8 @@ func (d *Distributor) running(ctx context.Context) error {
// IsPathSupported returns true if the given route is currently supported by the Distributor.
func (d *Distributor) IsPathSupported(p string) bool {
// API can be found at https://petstore.swagger.io/?url=https://raw.githubusercontent.com/prometheus/alertmanager/master/api/v2/openapi.yaml.
return d.isQuorumWritePath(p) || d.isUnaryWritePath(p) || d.isUnaryDeletePath(p) || d.isUnaryReadPath(p)
isQuorumReadPath, _ := d.isQuorumReadPath(p)
return d.isQuorumWritePath(p) || d.isUnaryWritePath(p) || d.isUnaryDeletePath(p) || d.isUnaryReadPath(p) || isQuorumReadPath
}

func (d *Distributor) isQuorumWritePath(p string) bool {
Expand All @@ -82,8 +84,15 @@ func (d *Distributor) isUnaryDeletePath(p string) bool {
return strings.HasSuffix(path.Dir(p), "/silence")
}

func (d *Distributor) isQuorumReadPath(p string) (bool, merger.Merger) {
if strings.HasSuffix(p, "/v1/alerts") {
return true, merger.V1Alerts{}
}
return false, nil
}

func (d *Distributor) isUnaryReadPath(p string) bool {
return strings.HasSuffix(p, "/alerts") ||
return strings.HasSuffix(p, "/v2/alerts") ||
strings.HasSuffix(p, "/alerts/groups") ||
strings.HasSuffix(p, "/silences") ||
strings.HasSuffix(path.Dir(p), "/silence") ||
Expand All @@ -109,7 +118,7 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)

if r.Method == http.MethodPost {
if d.isQuorumWritePath(r.URL.Path) {
d.doQuorumWrite(userID, w, r, logger)
d.doQuorum(userID, w, r, logger, merger.Noop{})
return
}
if d.isUnaryWritePath(r.URL.Path) {
Expand All @@ -124,6 +133,10 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)
}
}
if r.Method == http.MethodGet || r.Method == http.MethodHead {
if ok, m := d.isQuorumReadPath(r.URL.Path); ok {
d.doQuorum(userID, w, r, logger, m)
return
}
if d.isUnaryReadPath(r.URL.Path) {
d.doUnary(userID, w, r, logger)
return
Expand All @@ -133,7 +146,7 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)
http.Error(w, "route not supported by distributor", http.StatusNotFound)
}

func (d *Distributor) doQuorumWrite(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger) {
func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger, m merger.Merger) {
var body []byte
var err error
if r.Body != nil {
Expand All @@ -149,13 +162,13 @@ func (d *Distributor) doQuorumWrite(userID string, w http.ResponseWriter, r *htt
}
}

var firstSuccessfulResponse *httpgrpc.HTTPResponse
var firstSuccessfulResponseMtx sync.Mutex
var responses []*httpgrpc.HTTPResponse
var responsesMtx sync.Mutex
grpcHeaders := httpToHttpgrpcHeaders(r.Header)
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, []uint32{shardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
// Use a background context to make sure all alertmanagers get the request even if we return early.
localCtx := user.InjectOrgID(context.Background(), userID)
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorumWrite")
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum")
defer sp.Finish()

resp, err := d.doRequest(localCtx, am, &httpgrpc.HTTPRequest{
Expand All @@ -172,11 +185,9 @@ func (d *Distributor) doQuorumWrite(userID string, w http.ResponseWriter, r *htt
return httpgrpc.ErrorFromHTTPResponse(resp)
}

firstSuccessfulResponseMtx.Lock()
if firstSuccessfulResponse == nil {
firstSuccessfulResponse = resp
}
firstSuccessfulResponseMtx.Unlock()
responsesMtx.Lock()
responses = append(responses, resp)
responsesMtx.Unlock()

return nil
}, func() {})
Expand All @@ -186,12 +197,12 @@ func (d *Distributor) doQuorumWrite(userID string, w http.ResponseWriter, r *htt
return
}

firstSuccessfulResponseMtx.Lock() // Another request might be ongoing after quorum.
resp := firstSuccessfulResponse
firstSuccessfulResponseMtx.Unlock()
responsesMtx.Lock() // Another request might be ongoing after quorum.
resps := responses
responsesMtx.Unlock()

if resp != nil {
respondFromHTTPGRPCResponse(w, resp)
if len(resps) > 0 {
respondFromMultipleHTTPGRPCResponses(w, logger, resps, m)
} else {
// This should not happen.
level.Error(logger).Log("msg", "distributor did not receive any response from alertmanagers, but there were no errors")
Expand Down Expand Up @@ -287,3 +298,29 @@ func httpToHttpgrpcHeaders(hs http.Header) []*httpgrpc.Header {
}
return result
}

func respondFromMultipleHTTPGRPCResponses(w http.ResponseWriter, logger log.Logger, responses []*httpgrpc.HTTPResponse, merger merger.Merger) {
bodies := make([][]byte, len(responses))
for i, r := range responses {
bodies[i] = r.Body
}

body, err := merger.MergeResponses(bodies)
if err != nil {
level.Error(logger).Log("msg", "failed to merge responses for request", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

// It is assumed by using this function, the caller knows that the responses it receives
// have already need checked for success or failure, and that the headers will always
// match due to the nature of the request. If this is not the case, a different merge
// function should be implemented to cope with the differing responses.
response := &httpgrpc.HTTPResponse{
Code: responses[0].Code,
Headers: responses[0].Headers,
Body: body,
}

respondFromHTTPGRPCResponse(w, response)
}
42 changes: 33 additions & 9 deletions pkg/alertmanager/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func TestDistributor_DistributeRequest(t *testing.T) {
expectedTotalCalls int
headersNotPreserved bool
route string
// Paths where responses are merged, we need to supply a valid response body.
// Note that the actual merging logic is tested elsewhere (merger_test.go).
responseBody []byte
}{
{
name: "Write /alerts, Simple AM request, all AM healthy",
Expand Down Expand Up @@ -68,14 +71,24 @@ func TestDistributor_DistributeRequest(t *testing.T) {
expectedTotalCalls: 3,
route: "/alerts",
}, {
name: "Read /alerts is sent to only 1 AM",
name: "Read /v1/alerts is sent to 3 AMs",
numAM: 5,
numHappyAM: 5,
replicationFactor: 3,
isRead: true,
expStatusCode: http.StatusOK,
expectedTotalCalls: 3,
route: "/v1/alerts",
responseBody: []byte(`{"status":"success","data":[]}`),
}, {
name: "Read /v2/alerts is sent to only 1 AM",
numAM: 5,
numHappyAM: 5,
replicationFactor: 3,
isRead: true,
expStatusCode: http.StatusOK,
expectedTotalCalls: 1,
route: "/alerts",
route: "/v2/alerts",
}, {
name: "Read /alerts/groups is sent to only 1 AM",
numAM: 5,
Expand Down Expand Up @@ -189,7 +202,7 @@ func TestDistributor_DistributeRequest(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
route := "/alertmanager/api/v1" + c.route
d, ams, cleanup := prepare(t, c.numAM, c.numHappyAM, c.replicationFactor)
d, ams, cleanup := prepare(t, c.numAM, c.numHappyAM, c.replicationFactor, c.responseBody)
t.Cleanup(cleanup)

ctx := user.InjectOrgID(context.Background(), "1")
Expand All @@ -207,7 +220,6 @@ func TestDistributor_DistributeRequest(t *testing.T) {
w := httptest.NewRecorder()
d.DistributeRequest(w, req)
resp := w.Result()

require.Equal(t, c.expStatusCode, resp.StatusCode)

if !c.headersNotPreserved {
Expand Down Expand Up @@ -252,26 +264,35 @@ func TestDistributor_IsPathSupported(t *testing.T) {
"/alertmanager/api/v1/status": true,
"/alertmanager/api/v1/receivers": true,
"/alertmanager/api/v1/other": false,
"/alertmanager/api/v2/alerts": true,
"/alertmanager/api/v2/alerts/groups": true,
"/alertmanager/api/v2/silences": true,
"/alertmanager/api/v2/silence/id": true,
"/alertmanager/api/v2/silence/anything": true,
"/alertmanager/api/v2/silence/really": true,
"/alertmanager/api/v2/status": true,
"/alertmanager/api/v2/receivers": true,
"/alertmanager/api/v2/other": false,
"/alertmanager/other": false,
"/other": false,
}

for path, isSupported := range supported {
t.Run(path, func(t *testing.T) {
d, _, cleanup := prepare(t, 1, 1, 1)
d, _, cleanup := prepare(t, 1, 1, 1, []byte{})
t.Cleanup(cleanup)
require.Equal(t, isSupported, d.IsPathSupported(path))
})
}
}

func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int) (*Distributor, []*mockAlertmanager, func()) {
func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBody []byte) (*Distributor, []*mockAlertmanager, func()) {
ams := []*mockAlertmanager{}
for i := 0; i < numHappyAM; i++ {
ams = append(ams, newMockAlertmanager(i, true))
ams = append(ams, newMockAlertmanager(i, true, responseBody))
}
for i := numHappyAM; i < numAM; i++ {
ams = append(ams, newMockAlertmanager(i, false))
ams = append(ams, newMockAlertmanager(i, false, responseBody))
}

// Use a real ring with a mock KV store to test ring RF logic.
Expand Down Expand Up @@ -332,13 +353,15 @@ type mockAlertmanager struct {
mtx sync.Mutex
myAddr string
happy bool
responseBody []byte
}

func newMockAlertmanager(idx int, happy bool) *mockAlertmanager {
func newMockAlertmanager(idx int, happy bool, responseBody []byte) *mockAlertmanager {
return &mockAlertmanager{
receivedRequests: make(map[string]map[int]int),
myAddr: fmt.Sprintf("127.0.0.1:%05d", 10000+idx),
happy: happy,
responseBody: responseBody,
}
}

Expand Down Expand Up @@ -370,6 +393,7 @@ func (am *mockAlertmanager) HandleRequest(_ context.Context, in *httpgrpc.HTTPRe
Values: []string{"ok-option-1", "ok-option-2"},
},
},
Body: am.responseBody,
}, nil
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/alertmanager/merger/merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package merger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super-Nit: Not sure this package carries its own weight. I know there will be v2 implementation too, but it's still looks pretty thin. Same goes for Merger interface.

Copy link
Contributor Author

@stevesg stevesg Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and wasn't 100% sure what to do. The pkg/alertmanager package is getting very busy, was my reasoning here.

Would a pkg/alertmanager/distributor perhaps be better?

Also regarding the interface, I'd be equally happy with functions, if that's what you meant? (I was mainly taking inspiration from here

type Merger interface {
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would just use pkg/alertmanager, I think it's still a small package.

Also regarding the interface, I'd be equally happy with functions, if that's what you meant?

Yes, I was thinking about just using functions here. My reasoning is that 1) we're only going to have 2 implementations (v1, v2 api) and 2) client doesn't need to supply its own implementation. But I am first to admit this is just a personal preference for this specific case, and don't mind keeping it as-is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion here. I personally did like the merger package, given it's content is self contained but moving to pkg/alertmanager looks good to me as well. Let's just not block on this. V2 merging PR is awaiting for this PR! 😉

Would a pkg/alertmanager/distributor perhaps be better?

I wouldn't name the package distributor because we already have pkg/distributor and in the past (when we tried pkg/alertmanager/distributor at some point) we realised it was causing much confusion when reading the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving as-is to unblock, happy to refactor after the fact.


// Merger represents logic for merging response bodies.
type Merger interface {
MergeResponses([][]byte) ([]byte, error)
}

// Noop is an implementation of the Merger interface which does not actually merge
// responses, but just returns an arbitrary response(the first in the list). It can
// be used for write requests where the response is either empty or inconsequential.
type Noop struct{}

func (Noop) MergeResponses(in [][]byte) ([]byte, error) {
return in[0], nil
}
77 changes: 77 additions & 0 deletions pkg/alertmanager/merger/v1_alerts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package merger

import (
"encoding/json"
"fmt"
"sort"

v1 "github.com/prometheus/alertmanager/api/v1"
)

const (
statusSuccess = "success"
)

// V1Alerts implements the Merger interface for GET /v1/alerts. It returns the union of alerts over
// all the responses. When the same alert exists in multiple responses, the alert instance in the
// earliest response is returned in the final response. We cannot use the UpdatedAt timestamp as
// for V2Alerts, because the v1 API does not provide it.
type V1Alerts struct{}

func (V1Alerts) MergeResponses(in [][]byte) ([]byte, error) {
type bodyType struct {
Status string `json:"status"`
Data []*v1.Alert `json:"data"`
}

alerts := make([]*v1.Alert, 0)
for _, body := range in {
parsed := bodyType{}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, err
}
if parsed.Status != statusSuccess {
return nil, fmt.Errorf("unable to merge response of status: %s", parsed.Status)
}
alerts = append(alerts, parsed.Data...)
}

merged, err := mergeV1Alerts(alerts)
if err != nil {
return nil, err
}
body := bodyType{
Status: statusSuccess,
Data: merged,
}

result, err := json.Marshal(body)
if err != nil {
return nil, err
}

return result, nil
}

func mergeV1Alerts(in []*v1.Alert) ([]*v1.Alert, error) {
// Select an arbitrary alert for each distinct alert.
alerts := make(map[string]*v1.Alert)
for _, alert := range in {
key := alert.Fingerprint
if _, ok := alerts[key]; !ok {
alerts[key] = alert
}
}

result := make([]*v1.Alert, 0, len(alerts))
for _, alert := range alerts {
result = append(result, alert)
}

// Mimic Alertmanager which returns alerts ordered by fingerprint (as string).
sort.Slice(result, func(i, j int) bool {
return result[i].Fingerprint < result[j].Fingerprint
})

return result, nil
}
Loading