Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruler: optimised <prefix>/api/v1/rules and <prefix>/api/v1/alerts #3916

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## master / unreleased

* [ENHANCEMENT] Ruler: optimized `<prefix>/api/v1/rules` and `<prefix>/api/v1/alerts` when ruler sharding is enabled. #3916
* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916
* `cortex_ruler_clients`
* `cortex_ruler_client_request_duration_seconds`

## 1.8.0 in progress

* [CHANGE] Alertmanager: Don't expose cluster information to tenants via the `/alertmanager/api/v1/status` API endpoint when operating with clustering enabled.
Expand Down
8 changes: 8 additions & 0 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ var (
}
}

RulerShardingFlags = func(consulAddress string) map[string]string {
return map[string]string{
"-ruler.enable-sharding": "true",
"-ruler.ring.store": "consul",
"-ruler.ring.consul.hostname": consulAddress,
}
}

BlocksStorageFlags = func() map[string]string {
return map[string]string{
"-store.engine": blocksStorageEngine,
Expand Down
50 changes: 47 additions & 3 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/prompb"
yaml "gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/pkg/ruler"
)

var (
Expand Down Expand Up @@ -213,7 +215,49 @@ type ServerStatus struct {
} `json:"data"`
}

// GetRuleGroups gets the status of an alertmanager instance
// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules.
func (c *Client) GetPrometheusRules() ([]*ruler.RuleGroup, error) {
// Create HTTP request
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/api/v1/rules", c.rulerAddress), nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer res.Body.Close()

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}

// Decode the response.
type response struct {
Status string `json:"status"`
Data ruler.RuleDiscovery `json:"data"`
}

decoded := &response{}
if err := json.Unmarshal(body, decoded); err != nil {
return nil, err
}

if decoded.Status != "success" {
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
}

return decoded.Data.RuleGroups, nil
}

// GetRuleGroups gets the configured rule groups from the ruler.
func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) {
// Create HTTP request
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/rules", c.rulerAddress), nil)
Expand Down Expand Up @@ -247,7 +291,7 @@ func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) {
return rgs, nil
}

// SetRuleGroup gets the status of an alertmanager instance
// SetRuleGroup configures the provided rulegroup to the ruler.
func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) error {
// Create write request
data, err := yaml.Marshal(rulegroup)
Expand Down Expand Up @@ -277,7 +321,7 @@ func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) err
return nil
}

// DeleteRuleGroup gets the status of an alertmanager instance
// DeleteRuleGroup deletes a rule group.
func (c *Client) DeleteRuleGroup(namespace string, groupName string) error {
// Create HTTP request
req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/prom/rules/%s/%s", c.rulerAddress, url.PathEscape(namespace), url.PathEscape(groupName)), nil)
Expand Down
82 changes: 82 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

Expand Down Expand Up @@ -301,7 +303,87 @@ func TestRulerEvaluationDelay(t *testing.T) {
}
}
require.Equal(t, len(series.Samples), inputPos, "expect to have returned all evaluations")
}

func TestRulerSharding(t *testing.T) {
const numRulesGroups = 100

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Generate multiple rule groups, with 1 rule each.
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
expectedNames := make([]string, numRulesGroups)
for i := 0; i < numRulesGroups; i++ {
var recordNode yaml.Node
var exprNode yaml.Node

recordNode.SetString(fmt.Sprintf("rule_%d", i))
exprNode.SetString(strconv.Itoa(i))
ruleName := fmt.Sprintf("test_%d", i)

expectedNames[i] = ruleName
ruleGroups[i] = rulefmt.RuleGroup{
Name: ruleName,
Interval: 60,
Rules: []rulefmt.RuleNode{{
Record: recordNode,
Expr: exprNode,
}},
}
}

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
rulerFlags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(false),
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
map[string]string{
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
},
)

// Start rulers.
ruler1 := e2ecortex.NewRuler("ruler-1", rulerFlags, "")
ruler2 := e2ecortex.NewRuler("ruler-2", rulerFlags, "")
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2))

// Upload rule groups to one of the rulers.
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
require.NoError(t, err)

for _, ruleGroup := range ruleGroups {
require.NoError(t, c.SetRuleGroup(ruleGroup, "test"))
}

// Wait until rulers have loaded all rules.
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))

// Since rulers have loaded all rules, we expect that rules have been sharded
// between the two rulers.
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))

// Fetch the rules and ensure they match the configured ones.
actualGroups, err := c.GetPrometheusRules()
require.NoError(t, err)

var actualNames []string
for _, group := range actualGroups {
actualNames = append(actualNames, group.Name)
}
assert.ElementsMatch(t, expectedNames, actualNames)
}

func TestRulerAlertmanager(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"errors"
io "io"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down
79 changes: 79 additions & 0 deletions pkg/ruler/client_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package ruler

import (
"time"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prometheus.Registerer) *client.Pool {
// We prefer sane defaults instead of exposing further config options.
poolCfg := client.PoolConfig{
CheckInterval: time.Minute,
HealthCheckEnabled: true,
HealthCheckTimeout: 10 * time.Second,
}

clientsCount := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_ruler_clients",
Help: "The current number of ruler clients in the pool.",
})

return client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger)
}

func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_ruler_client_request_duration_seconds",
Help: "Time spent executing requests to the ruler.",
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
}, []string{"operation", "status_code"})

return func(addr string) (client.PoolClient, error) {
return dialRulerClient(clientCfg, addr, requestDuration)
}
}

func dialRulerClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*rulerExtendedClient, error) {
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
if err != nil {
return nil, err
}

conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial ruler %s", addr)
}

return &rulerExtendedClient{
RulerClient: NewRulerClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
conn: conn,
}, nil
}

type rulerExtendedClient struct {
RulerClient
grpc_health_v1.HealthClient
conn *grpc.ClientConn
}

func (c *rulerExtendedClient) Close() error {
return c.conn.Close()
}

func (c *rulerExtendedClient) String() string {
return c.RemoteAddress()
}

func (c *rulerExtendedClient) RemoteAddress() string {
return c.conn.Target()
}
68 changes: 68 additions & 0 deletions pkg/ruler/client_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ruler

import (
"context"
"net"
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

func Test_newRulerClientFactory(t *testing.T) {
// Create a GRPC server used to query the mocked service.
grpcServer := grpc.NewServer()
defer grpcServer.GracefulStop()

srv := &mockRulerServer{}
RegisterRulerServer(grpcServer, srv)

listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

go func() {
require.NoError(t, grpcServer.Serve(listener))
}()

// Create a client factory and query back the mocked service
// with different clients.
cfg := grpcclient.Config{}
flagext.DefaultValues(&cfg)

reg := prometheus.NewPedanticRegistry()
factory := newRulerClientFactory(cfg, reg)

for i := 0; i < 2; i++ {
client, err := factory(listener.Addr().String())
require.NoError(t, err)
defer client.Close() //nolint:errcheck

ctx := user.InjectOrgID(context.Background(), "test")
_, err = client.(*rulerExtendedClient).Rules(ctx, &RulesRequest{})
assert.NoError(t, err)
}

// Assert on the request duration metric, but since it's a duration histogram and
// we can't predict the exact time it took, we need to workaround it.
metrics, err := reg.Gather()
require.NoError(t, err)

assert.Len(t, metrics, 1)
assert.Equal(t, "cortex_ruler_client_request_duration_seconds", metrics[0].GetName())
assert.Equal(t, dto.MetricType_HISTOGRAM, metrics[0].GetType())
assert.Len(t, metrics[0].GetMetric(), 1)
assert.Equal(t, uint64(2), metrics[0].GetMetric()[0].GetHistogram().GetSampleCount())
}

type mockRulerServer struct{}

func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) {
return &RulesResponse{}, nil
}
Loading