Skip to content

Commit ebef1e1

Browse files
authored
Ruler: optimised <prefix>/api/v1/rules and <prefix>/api/v1/alerts (#3916)
* Use a grpc clients pool in the ruler Signed-off-by: Marco Pracucci <[email protected]> * Concurrently fetch rules from all rulers Signed-off-by: Marco Pracucci <[email protected]> * Added subservices manager Signed-off-by: Marco Pracucci <[email protected]> * Fixed Rules() grpc call Signed-off-by: Marco Pracucci <[email protected]> * Added integration test Signed-off-by: Marco Pracucci <[email protected]> * Added CHANGELOG entry Signed-off-by: Marco Pracucci <[email protected]> * Addressed review comments Signed-off-by: Marco Pracucci <[email protected]> * Fixed CHANGELOG Signed-off-by: Marco Pracucci <[email protected]>
1 parent 2dae12a commit ebef1e1

File tree

8 files changed

+345
-42
lines changed

8 files changed

+345
-42
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## master / unreleased
44

5+
* [ENHANCEMENT] Ruler: optimized `<prefix>/api/v1/rules` and `<prefix>/api/v1/alerts` when ruler sharding is enabled. #3916
6+
* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916
7+
* `cortex_ruler_clients`
8+
* `cortex_ruler_client_request_duration_seconds`
9+
510
## 1.8.0 in progress
611

712
* [CHANGE] Alertmanager: Don't expose cluster information to tenants via the `/alertmanager/api/v1/status` API endpoint when operating with clustering enabled.

integration/configs.go

+8
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,14 @@ var (
168168
}
169169
}
170170

171+
RulerShardingFlags = func(consulAddress string) map[string]string {
172+
return map[string]string{
173+
"-ruler.enable-sharding": "true",
174+
"-ruler.ring.store": "consul",
175+
"-ruler.ring.consul.hostname": consulAddress,
176+
}
177+
}
178+
171179
BlocksStorageFlags = func() map[string]string {
172180
return map[string]string{
173181
"-store.engine": blocksStorageEngine,

integration/e2ecortex/client.go

+47-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"github.com/prometheus/prometheus/pkg/rulefmt"
2424
"github.com/prometheus/prometheus/prompb"
2525
yaml "gopkg.in/yaml.v3"
26+
27+
"github.com/cortexproject/cortex/pkg/ruler"
2628
)
2729

2830
var (
@@ -213,7 +215,49 @@ type ServerStatus struct {
213215
} `json:"data"`
214216
}
215217

216-
// GetRuleGroups gets the status of an alertmanager instance
218+
// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules.
219+
func (c *Client) GetPrometheusRules() ([]*ruler.RuleGroup, error) {
220+
// Create HTTP request
221+
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/api/v1/rules", c.rulerAddress), nil)
222+
if err != nil {
223+
return nil, err
224+
}
225+
req.Header.Set("X-Scope-OrgID", c.orgID)
226+
227+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
228+
defer cancel()
229+
230+
// Execute HTTP request
231+
res, err := c.httpClient.Do(req.WithContext(ctx))
232+
if err != nil {
233+
return nil, err
234+
}
235+
defer res.Body.Close()
236+
237+
body, err := ioutil.ReadAll(res.Body)
238+
if err != nil {
239+
return nil, err
240+
}
241+
242+
// Decode the response.
243+
type response struct {
244+
Status string `json:"status"`
245+
Data ruler.RuleDiscovery `json:"data"`
246+
}
247+
248+
decoded := &response{}
249+
if err := json.Unmarshal(body, decoded); err != nil {
250+
return nil, err
251+
}
252+
253+
if decoded.Status != "success" {
254+
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
255+
}
256+
257+
return decoded.Data.RuleGroups, nil
258+
}
259+
260+
// GetRuleGroups gets the configured rule groups from the ruler.
217261
func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) {
218262
// Create HTTP request
219263
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/rules", c.rulerAddress), nil)
@@ -247,7 +291,7 @@ func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) {
247291
return rgs, nil
248292
}
249293

250-
// SetRuleGroup gets the status of an alertmanager instance
294+
// SetRuleGroup configures the provided rulegroup to the ruler.
251295
func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) error {
252296
// Create write request
253297
data, err := yaml.Marshal(rulegroup)
@@ -277,7 +321,7 @@ func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) err
277321
return nil
278322
}
279323

280-
// DeleteRuleGroup gets the status of an alertmanager instance
324+
// DeleteRuleGroup deletes a rule group.
281325
func (c *Client) DeleteRuleGroup(namespace string, groupName string) error {
282326
// Create HTTP request
283327
req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/prom/rules/%s/%s", c.rulerAddress, url.PathEscape(namespace), url.PathEscape(groupName)), nil)

integration/ruler_test.go

+82
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http"
1212
"os"
1313
"path/filepath"
14+
"strconv"
1415
"strings"
1516
"testing"
1617
"time"
@@ -20,6 +21,7 @@ import (
2021
"github.com/prometheus/prometheus/pkg/rulefmt"
2122
"github.com/prometheus/prometheus/pkg/value"
2223
"github.com/prometheus/prometheus/prompb"
24+
"github.com/stretchr/testify/assert"
2325
"github.com/stretchr/testify/require"
2426
"gopkg.in/yaml.v3"
2527

@@ -301,7 +303,87 @@ func TestRulerEvaluationDelay(t *testing.T) {
301303
}
302304
}
303305
require.Equal(t, len(series.Samples), inputPos, "expect to have returned all evaluations")
306+
}
307+
308+
func TestRulerSharding(t *testing.T) {
309+
const numRulesGroups = 100
310+
311+
s, err := e2e.NewScenario(networkName)
312+
require.NoError(t, err)
313+
defer s.Close()
314+
315+
// Generate multiple rule groups, with 1 rule each.
316+
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
317+
expectedNames := make([]string, numRulesGroups)
318+
for i := 0; i < numRulesGroups; i++ {
319+
var recordNode yaml.Node
320+
var exprNode yaml.Node
321+
322+
recordNode.SetString(fmt.Sprintf("rule_%d", i))
323+
exprNode.SetString(strconv.Itoa(i))
324+
ruleName := fmt.Sprintf("test_%d", i)
325+
326+
expectedNames[i] = ruleName
327+
ruleGroups[i] = rulefmt.RuleGroup{
328+
Name: ruleName,
329+
Interval: 60,
330+
Rules: []rulefmt.RuleNode{{
331+
Record: recordNode,
332+
Expr: exprNode,
333+
}},
334+
}
335+
}
336+
337+
// Start dependencies.
338+
consul := e2edb.NewConsul()
339+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
340+
require.NoError(t, s.StartAndWaitReady(consul, minio))
304341

342+
// Configure the ruler.
343+
rulerFlags := mergeFlags(
344+
BlocksStorageFlags(),
345+
RulerFlags(false),
346+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
347+
map[string]string{
348+
// Since we're not going to run any rule, we don't need the
349+
// store-gateway to be configured to a valid address.
350+
"-querier.store-gateway-addresses": "localhost:12345",
351+
// Enable the bucket index so we can skip the initial bucket scan.
352+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
353+
},
354+
)
355+
356+
// Start rulers.
357+
ruler1 := e2ecortex.NewRuler("ruler-1", rulerFlags, "")
358+
ruler2 := e2ecortex.NewRuler("ruler-2", rulerFlags, "")
359+
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2)
360+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2))
361+
362+
// Upload rule groups to one of the rulers.
363+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
364+
require.NoError(t, err)
365+
366+
for _, ruleGroup := range ruleGroups {
367+
require.NoError(t, c.SetRuleGroup(ruleGroup, "test"))
368+
}
369+
370+
// Wait until rulers have loaded all rules.
371+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
372+
373+
// Since rulers have loaded all rules, we expect that rules have been sharded
374+
// between the two rulers.
375+
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
376+
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
377+
378+
// Fetch the rules and ensure they match the configured ones.
379+
actualGroups, err := c.GetPrometheusRules()
380+
require.NoError(t, err)
381+
382+
var actualNames []string
383+
for _, group := range actualGroups {
384+
actualNames = append(actualNames, group.Name)
385+
}
386+
assert.ElementsMatch(t, expectedNames, actualNames)
305387
}
306388

307389
func TestRulerAlertmanager(t *testing.T) {

pkg/ruler/api_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7-
io "io"
7+
"io"
88
"io/ioutil"
99
"net/http"
1010
"net/http/httptest"

pkg/ruler/client_pool.go

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package ruler
2+
3+
import (
4+
"time"
5+
6+
"github.com/go-kit/kit/log"
7+
"github.com/pkg/errors"
8+
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/client_golang/prometheus/promauto"
10+
"google.golang.org/grpc"
11+
"google.golang.org/grpc/health/grpc_health_v1"
12+
13+
"github.com/cortexproject/cortex/pkg/ring/client"
14+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
15+
)
16+
17+
func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prometheus.Registerer) *client.Pool {
18+
// We prefer sane defaults instead of exposing further config options.
19+
poolCfg := client.PoolConfig{
20+
CheckInterval: time.Minute,
21+
HealthCheckEnabled: true,
22+
HealthCheckTimeout: 10 * time.Second,
23+
}
24+
25+
clientsCount := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
26+
Name: "cortex_ruler_clients",
27+
Help: "The current number of ruler clients in the pool.",
28+
})
29+
30+
return client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger)
31+
}
32+
33+
func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
34+
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
35+
Name: "cortex_ruler_client_request_duration_seconds",
36+
Help: "Time spent executing requests to the ruler.",
37+
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
38+
}, []string{"operation", "status_code"})
39+
40+
return func(addr string) (client.PoolClient, error) {
41+
return dialRulerClient(clientCfg, addr, requestDuration)
42+
}
43+
}
44+
45+
func dialRulerClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*rulerExtendedClient, error) {
46+
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
47+
if err != nil {
48+
return nil, err
49+
}
50+
51+
conn, err := grpc.Dial(addr, opts...)
52+
if err != nil {
53+
return nil, errors.Wrapf(err, "failed to dial ruler %s", addr)
54+
}
55+
56+
return &rulerExtendedClient{
57+
RulerClient: NewRulerClient(conn),
58+
HealthClient: grpc_health_v1.NewHealthClient(conn),
59+
conn: conn,
60+
}, nil
61+
}
62+
63+
type rulerExtendedClient struct {
64+
RulerClient
65+
grpc_health_v1.HealthClient
66+
conn *grpc.ClientConn
67+
}
68+
69+
func (c *rulerExtendedClient) Close() error {
70+
return c.conn.Close()
71+
}
72+
73+
func (c *rulerExtendedClient) String() string {
74+
return c.RemoteAddress()
75+
}
76+
77+
func (c *rulerExtendedClient) RemoteAddress() string {
78+
return c.conn.Target()
79+
}

pkg/ruler/client_pool_test.go

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package ruler
2+
3+
import (
4+
"context"
5+
"net"
6+
"testing"
7+
8+
"github.com/prometheus/client_golang/prometheus"
9+
dto "github.com/prometheus/client_model/go"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
"github.com/weaveworks/common/user"
13+
"google.golang.org/grpc"
14+
15+
"github.com/cortexproject/cortex/pkg/util/flagext"
16+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
17+
)
18+
19+
func Test_newRulerClientFactory(t *testing.T) {
20+
// Create a GRPC server used to query the mocked service.
21+
grpcServer := grpc.NewServer()
22+
defer grpcServer.GracefulStop()
23+
24+
srv := &mockRulerServer{}
25+
RegisterRulerServer(grpcServer, srv)
26+
27+
listener, err := net.Listen("tcp", "localhost:0")
28+
require.NoError(t, err)
29+
30+
go func() {
31+
require.NoError(t, grpcServer.Serve(listener))
32+
}()
33+
34+
// Create a client factory and query back the mocked service
35+
// with different clients.
36+
cfg := grpcclient.Config{}
37+
flagext.DefaultValues(&cfg)
38+
39+
reg := prometheus.NewPedanticRegistry()
40+
factory := newRulerClientFactory(cfg, reg)
41+
42+
for i := 0; i < 2; i++ {
43+
client, err := factory(listener.Addr().String())
44+
require.NoError(t, err)
45+
defer client.Close() //nolint:errcheck
46+
47+
ctx := user.InjectOrgID(context.Background(), "test")
48+
_, err = client.(*rulerExtendedClient).Rules(ctx, &RulesRequest{})
49+
assert.NoError(t, err)
50+
}
51+
52+
// Assert on the request duration metric, but since it's a duration histogram and
53+
// we can't predict the exact time it took, we need to workaround it.
54+
metrics, err := reg.Gather()
55+
require.NoError(t, err)
56+
57+
assert.Len(t, metrics, 1)
58+
assert.Equal(t, "cortex_ruler_client_request_duration_seconds", metrics[0].GetName())
59+
assert.Equal(t, dto.MetricType_HISTOGRAM, metrics[0].GetType())
60+
assert.Len(t, metrics[0].GetMetric(), 1)
61+
assert.Equal(t, uint64(2), metrics[0].GetMetric()[0].GetHistogram().GetSampleCount())
62+
}
63+
64+
type mockRulerServer struct{}
65+
66+
func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) {
67+
return &RulesResponse{}, nil
68+
}

0 commit comments

Comments
 (0)