Skip to content

Commit 653affe

Browse files
authored
Implement quorum reads and merging for /v2/silences and /v2/silence/{id}. (#4141)
Reading silence listings and individual silences will now read from a quorum of replicas and return a merged response. In both cases, in the presence of duplicate silences with the same "id", the silence with the most recent "updatedAt" timestamp is return. Note that this does not yet bring full consistency to silences, as we are not able to perform quorum writing of silences without upstream changes to Alertmanager. However, it will still be an improvement: - Reads will be consistent more often, being able to take state from multiple replicas instead of just one. - Requests will be resilient to single replica failure, as the request is essentially attempted on all three replicas. An possible extension to this change would be to block for all replicas to reply, but still allow a single failure. This would mean that reads are consistent for the case when all replicas are contactable. Signed-off-by: Steve Simpson <[email protected]>
1 parent 0732874 commit 653affe

File tree

8 files changed

+463
-17
lines changed

8 files changed

+463
-17
lines changed

integration/alertmanager_test.go

+38-9
Original file line numberDiff line numberDiff line change
@@ -386,29 +386,58 @@ func TestAlertmanagerSharding(t *testing.T) {
386386
assert.Equal(t, s3, ids[id3].Status.State)
387387
}
388388

389-
// Endpoint: GET /silences
389+
// Endpoint: GET /v1/silences
390390
{
391391
for _, c := range clients {
392-
list, err := c.GetSilences(context.Background())
392+
list, err := c.GetSilencesV1(context.Background())
393393
require.NoError(t, err)
394394
assertSilences(list, types.SilenceStateActive, types.SilenceStateActive, types.SilenceStateActive)
395395
}
396396
}
397397

398-
// Endpoint: GET /silence/{id}
398+
// Endpoint: GET /v2/silences
399399
{
400400
for _, c := range clients {
401-
sil1, err := c.GetSilence(context.Background(), id1)
401+
list, err := c.GetSilencesV2(context.Background())
402+
require.NoError(t, err)
403+
assertSilences(list, types.SilenceStateActive, types.SilenceStateActive, types.SilenceStateActive)
404+
}
405+
}
406+
407+
// Endpoint: GET /v1/silence/{id}
408+
{
409+
for _, c := range clients {
410+
sil1, err := c.GetSilenceV1(context.Background(), id1)
411+
require.NoError(t, err)
412+
assert.Equal(t, comment(1), sil1.Comment)
413+
assert.Equal(t, types.SilenceStateActive, sil1.Status.State)
414+
415+
sil2, err := c.GetSilenceV1(context.Background(), id2)
416+
require.NoError(t, err)
417+
assert.Equal(t, comment(2), sil2.Comment)
418+
assert.Equal(t, types.SilenceStateActive, sil2.Status.State)
419+
420+
sil3, err := c.GetSilenceV1(context.Background(), id3)
421+
require.NoError(t, err)
422+
assert.Equal(t, comment(3), sil3.Comment)
423+
assert.Equal(t, types.SilenceStateActive, sil3.Status.State)
424+
}
425+
}
426+
427+
// Endpoint: GET /v2/silence/{id}
428+
{
429+
for _, c := range clients {
430+
sil1, err := c.GetSilenceV2(context.Background(), id1)
402431
require.NoError(t, err)
403432
assert.Equal(t, comment(1), sil1.Comment)
404433
assert.Equal(t, types.SilenceStateActive, sil1.Status.State)
405434

406-
sil2, err := c.GetSilence(context.Background(), id2)
435+
sil2, err := c.GetSilenceV2(context.Background(), id2)
407436
require.NoError(t, err)
408437
assert.Equal(t, comment(2), sil2.Comment)
409438
assert.Equal(t, types.SilenceStateActive, sil2.Status.State)
410439

411-
sil3, err := c.GetSilence(context.Background(), id3)
440+
sil3, err := c.GetSilenceV2(context.Background(), id3)
412441
require.NoError(t, err)
413442
assert.Equal(t, comment(3), sil3.Comment)
414443
assert.Equal(t, types.SilenceStateActive, sil3.Status.State)
@@ -445,7 +474,7 @@ func TestAlertmanagerSharding(t *testing.T) {
445474
require.NoError(t, waitForSilences("expired", 1*testCfg.replicationFactor))
446475

447476
for _, c := range clients {
448-
list, err := c.GetSilences(context.Background())
477+
list, err := c.GetSilencesV2(context.Background())
449478
require.NoError(t, err)
450479
assertSilences(list, types.SilenceStateActive, types.SilenceStateExpired, types.SilenceStateActive)
451480
}
@@ -455,7 +484,7 @@ func TestAlertmanagerSharding(t *testing.T) {
455484
require.NoError(t, waitForSilences("expired", 2*testCfg.replicationFactor))
456485

457486
for _, c := range clients {
458-
list, err := c.GetSilences(context.Background())
487+
list, err := c.GetSilencesV2(context.Background())
459488
require.NoError(t, err)
460489
assertSilences(list, types.SilenceStateActive, types.SilenceStateExpired, types.SilenceStateExpired)
461490
}
@@ -465,7 +494,7 @@ func TestAlertmanagerSharding(t *testing.T) {
465494
require.NoError(t, waitForSilences("expired", 3*testCfg.replicationFactor))
466495

467496
for _, c := range clients {
468-
list, err := c.GetSilences(context.Background())
497+
list, err := c.GetSilencesV2(context.Background())
469498
require.NoError(t, err)
470499
assertSilences(list, types.SilenceStateExpired, types.SilenceStateExpired, types.SilenceStateExpired)
471500
}

integration/e2ecortex/client.go

+60-2
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,7 @@ func (c *Client) CreateSilence(ctx context.Context, silence types.Silence) (stri
651651
return decoded.Data.SilenceID, nil
652652
}
653653

654-
func (c *Client) GetSilences(ctx context.Context) ([]types.Silence, error) {
654+
func (c *Client) GetSilencesV1(ctx context.Context) ([]types.Silence, error) {
655655
u := c.alertmanagerClient.URL("api/prom/api/v1/silences", nil)
656656

657657
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
@@ -689,7 +689,36 @@ func (c *Client) GetSilences(ctx context.Context) ([]types.Silence, error) {
689689
return decoded.Data, nil
690690
}
691691

692-
func (c *Client) GetSilence(ctx context.Context, id string) (types.Silence, error) {
692+
func (c *Client) GetSilencesV2(ctx context.Context) ([]types.Silence, error) {
693+
u := c.alertmanagerClient.URL("api/prom/api/v2/silences", nil)
694+
695+
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
696+
if err != nil {
697+
return nil, fmt.Errorf("error creating request: %v", err)
698+
}
699+
700+
resp, body, err := c.alertmanagerClient.Do(ctx, req)
701+
if err != nil {
702+
return nil, err
703+
}
704+
705+
if resp.StatusCode == http.StatusNotFound {
706+
return nil, ErrNotFound
707+
}
708+
709+
if resp.StatusCode/100 != 2 {
710+
return nil, fmt.Errorf("getting silences failed with status %d and error %v", resp.StatusCode, string(body))
711+
}
712+
713+
decoded := []types.Silence{}
714+
if err := json.Unmarshal(body, &decoded); err != nil {
715+
return nil, err
716+
}
717+
718+
return decoded, nil
719+
}
720+
721+
func (c *Client) GetSilenceV1(ctx context.Context, id string) (types.Silence, error) {
693722
u := c.alertmanagerClient.URL(fmt.Sprintf("api/prom/api/v1/silence/%s", url.PathEscape(id)), nil)
694723

695724
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
@@ -727,6 +756,35 @@ func (c *Client) GetSilence(ctx context.Context, id string) (types.Silence, erro
727756
return decoded.Data, nil
728757
}
729758

759+
func (c *Client) GetSilenceV2(ctx context.Context, id string) (types.Silence, error) {
760+
u := c.alertmanagerClient.URL(fmt.Sprintf("api/prom/api/v2/silence/%s", url.PathEscape(id)), nil)
761+
762+
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
763+
if err != nil {
764+
return types.Silence{}, fmt.Errorf("error creating request: %v", err)
765+
}
766+
767+
resp, body, err := c.alertmanagerClient.Do(ctx, req)
768+
if err != nil {
769+
return types.Silence{}, err
770+
}
771+
772+
if resp.StatusCode == http.StatusNotFound {
773+
return types.Silence{}, ErrNotFound
774+
}
775+
776+
if resp.StatusCode/100 != 2 {
777+
return types.Silence{}, fmt.Errorf("getting silence failed with status %d and error %v", resp.StatusCode, string(body))
778+
}
779+
780+
decoded := types.Silence{}
781+
if err := json.Unmarshal(body, &decoded); err != nil {
782+
return types.Silence{}, err
783+
}
784+
785+
return decoded, nil
786+
}
787+
730788
func (c *Client) DeleteSilence(ctx context.Context, id string) error {
731789
u := c.alertmanagerClient.URL(fmt.Sprintf("api/prom/api/v1/silence/%s", url.PathEscape(id)), nil)
732790

pkg/alertmanager/distributor.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,18 @@ func (d *Distributor) isQuorumReadPath(p string) (bool, merger.Merger) {
9494
if strings.HasSuffix(p, "/v2/alerts/groups") {
9595
return true, merger.V2AlertGroups{}
9696
}
97+
if strings.HasSuffix(p, "/v2/silences") {
98+
return true, merger.V2Silences{}
99+
}
100+
if strings.HasSuffix(path.Dir(p), "/v2/silence") {
101+
return true, merger.V2SilenceID{}
102+
}
97103
return false, nil
98104
}
99105

100106
func (d *Distributor) isUnaryReadPath(p string) bool {
101-
return strings.HasSuffix(p, "/silences") ||
102-
strings.HasSuffix(path.Dir(p), "/silence") ||
107+
return strings.HasSuffix(p, "/v1/silences") ||
108+
strings.HasSuffix(path.Dir(p), "/v1/silence") ||
103109
strings.HasSuffix(p, "/status") ||
104110
strings.HasSuffix(p, "/receivers")
105111
}

pkg/alertmanager/distributor_test.go

+25-4
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,24 @@ func TestDistributor_DistributeRequest(t *testing.T) {
119119
headersNotPreserved: true,
120120
route: "/alerts/groups",
121121
}, {
122-
name: "Read /silences is sent to only 1 AM",
122+
name: "Read /v1/silences is sent to only 1 AM",
123123
numAM: 5,
124124
numHappyAM: 5,
125125
replicationFactor: 3,
126126
isRead: true,
127127
expStatusCode: http.StatusOK,
128128
expectedTotalCalls: 1,
129-
route: "/silences",
129+
route: "/v1/silences",
130+
}, {
131+
name: "Read /v2/silences is sent to 3 AMs",
132+
numAM: 5,
133+
numHappyAM: 5,
134+
replicationFactor: 3,
135+
isRead: true,
136+
expStatusCode: http.StatusOK,
137+
expectedTotalCalls: 3,
138+
route: "/v2/silences",
139+
responseBody: []byte(`[]`),
130140
}, {
131141
name: "Write /silences is sent to only 1 AM",
132142
numAM: 5,
@@ -136,15 +146,26 @@ func TestDistributor_DistributeRequest(t *testing.T) {
136146
expectedTotalCalls: 1,
137147
route: "/silences",
138148
}, {
139-
name: "Read /silence/id is sent to only 1 AM",
149+
name: "Read /v1/silence/id is sent to only 1 AM",
140150
numAM: 5,
141151
numHappyAM: 5,
142152
replicationFactor: 3,
143153
isRead: true,
144154
expStatusCode: http.StatusOK,
145155
expectedTotalCalls: 1,
146-
route: "/silence/id",
156+
route: "/v1/silence/id",
147157
}, {
158+
name: "Read /v2/silence/id is sent to 3 AMs",
159+
numAM: 5,
160+
numHappyAM: 5,
161+
replicationFactor: 3,
162+
isRead: true,
163+
expStatusCode: http.StatusOK,
164+
expectedTotalCalls: 3,
165+
route: "/v2/silence/id",
166+
responseBody: []byte(`{"id":"aaa","updatedAt":"2020-01-01T00:00:00Z"}`),
167+
},
168+
{
148169
name: "Write /silence/id not supported",
149170
numAM: 5,
150171
numHappyAM: 5,
+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package merger
2+
3+
import (
4+
"errors"
5+
6+
"github.com/go-openapi/swag"
7+
v2_models "github.com/prometheus/alertmanager/api/v2/models"
8+
)
9+
10+
// V2SilenceID implements the Merger interface for GET /v2/silence/{id}. It returns the most
11+
// recently updated silence (newest UpdatedAt timestamp).
12+
type V2SilenceID struct{}
13+
14+
func (V2SilenceID) MergeResponses(in [][]byte) ([]byte, error) {
15+
silences := make(v2_models.GettableSilences, 0)
16+
for _, body := range in {
17+
parsed := &v2_models.GettableSilence{}
18+
if err := swag.ReadJSON(body, parsed); err != nil {
19+
return nil, err
20+
}
21+
silences = append(silences, parsed)
22+
}
23+
24+
merged, err := mergeV2Silences(silences)
25+
if err != nil {
26+
return nil, err
27+
}
28+
29+
if len(merged) != 1 {
30+
return nil, errors.New("unexpected mismatched silence ids")
31+
}
32+
33+
return swag.WriteJSON(merged[0])
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package merger
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestV2SilenceId_ReturnsNewestSilence(t *testing.T) {
10+
11+
// We re-use MergeV2Silences so we rely on that being primarily tested elsewhere.
12+
13+
in := [][]byte{
14+
[]byte(`{"id":"77b580dd-1d9c-4b7e-9bba-13ac173cb4e5","status":{"state":"expired"},` +
15+
`"updatedAt":"2021-04-28T17:31:02.215Z","comment":"This is the newest silence",` +
16+
`"createdBy":"","endsAt":"2021-04-28T17:31:02.215Z","matchers":` +
17+
`[{"isEqual":true,"isRegex":false,"name":"instance","value":"prometheus-one"}],` +
18+
`"startsAt":"2021-04-28T17:31:01.725Z"}`),
19+
[]byte(`{"id":"77b580dd-1d9c-4b7e-9bba-13ac173cb4e5","status":{"state":"expired"},` +
20+
`"updatedAt":"2021-04-28T17:31:02.000Z","comment":"Silence Comment #1",` +
21+
`"createdBy":"","endsAt":"2021-04-28T17:31:02.215Z","matchers":` +
22+
`[{"isEqual":true,"isRegex":false,"name":"instance","value":"prometheus-one"}],` +
23+
`"startsAt":"2021-04-28T17:31:01.725Z"}`),
24+
[]byte(`{"id":"77b580dd-1d9c-4b7e-9bba-13ac173cb4e5","status":{"state":"expired"},` +
25+
`"updatedAt":"2021-04-28T17:31:02.000Z","comment":"Silence Comment #1",` +
26+
`"createdBy":"","endsAt":"2021-04-28T17:31:02.215Z","matchers":` +
27+
`[{"isEqual":true,"isRegex":false,"name":"instance","value":"prometheus-one"}],` +
28+
`"startsAt":"2021-04-28T17:31:01.725Z"}`),
29+
}
30+
31+
expected := []byte(`{"id":"77b580dd-1d9c-4b7e-9bba-13ac173cb4e5","status":{"state":"expired"},` +
32+
`"updatedAt":"2021-04-28T17:31:02.215Z","comment":"This is the newest silence",` +
33+
`"createdBy":"","endsAt":"2021-04-28T17:31:02.215Z","matchers":` +
34+
`[{"isEqual":true,"isRegex":false,"name":"instance","value":"prometheus-one"}],` +
35+
`"startsAt":"2021-04-28T17:31:01.725Z"}`)
36+
37+
out, err := V2SilenceID{}.MergeResponses(in)
38+
require.NoError(t, err)
39+
require.Equal(t, string(expected), string(out))
40+
}
41+
42+
func TestV2SilenceID_InvalidDifferentIDs(t *testing.T) {
43+
44+
// Responses containing silences with different IDs is invalid input.
45+
46+
in := [][]byte{
47+
[]byte(`{"id":"77b580dd-1d9c-4b7e-9bba-13ac173cb4e5","status":{"state":"expired"},` +
48+
`"updatedAt":"2021-04-28T17:31:02.215Z","comment":"Silence Comment #1",` +
49+
`"createdBy":"","endsAt":"2021-04-28T17:31:02.215Z","matchers":` +
50+
`[{"isEqual":true,"isRegex":false,"name":"instance","value":"prometheus-one"}],` +
51+
`"startsAt":"2021-04-28T17:31:01.725Z"}`),
52+
[]byte(`{"id":"261248d1-4ff7-4cf1-9957-850c65f4e48b","status":{"state":"expired"},` +
53+
`"updatedAt":"2021-04-28T17:31:02.082Z","comment":"Silence Comment #3",` +
54+
`"createdBy":"","endsAt":"2021-04-28T17:31:02.082Z","matchers":` +
55+
`[{"isEqual":true,"isRegex":false,"name":"instance","value":"prometheus-one"}],` +
56+
`"startsAt":"2021-04-28T17:31:01.735Z"}`),
57+
}
58+
59+
_, err := V2SilenceID{}.MergeResponses(in)
60+
require.Error(t, err)
61+
}

0 commit comments

Comments
 (0)