Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add users stats http api to ingester #6178

Merged
merged 5 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
* [ENHANCEMENT] Ingester: Add new API `/ingester/all_user_stats` #6178

## 1.18.0 in progress

Expand Down
4 changes: 4 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,19 +284,23 @@ type Ingester interface {
FlushHandler(http.ResponseWriter, *http.Request)
ShutdownHandler(http.ResponseWriter, *http.Request)
RenewTokenHandler(http.ResponseWriter, *http.Request)
AllUserStatsHandler(http.ResponseWriter, *http.Request)
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
}

// RegisterIngester registers the ingesters HTTP and GRPC service
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
client.RegisterIngesterServer(a.server.GRPC, i)

a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/all_user_stats", "Usage Statistics")

a.indexPage.AddLink(SectionDangerous, "/ingester/flush", "Trigger a Flush of data from Ingester to storage")
a.indexPage.AddLink(SectionDangerous, "/ingester/shutdown", "Trigger Ingester Shutdown (Dangerous)")
a.indexPage.AddLink(SectionDangerous, "/ingester/renewTokens", "Renew Ingester Tokens (10%)")
a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.

// Legacy Routes
Expand Down
34 changes: 21 additions & 13 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ha"
"github.com/cortexproject/cortex/pkg/ingester"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
Expand Down Expand Up @@ -1319,7 +1320,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad
}

// UserStats returns statistics about the current user.
func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error) {
replicationSet, err := d.GetIngestersForMetadata(ctx)
if err != nil {
return nil, err
Expand All @@ -1336,7 +1337,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
return nil, err
}

totalStats := &UserStats{}
totalStats := &ingester.UserStats{}
for _, resp := range resps {
r := resp.(*ingester_client.UserStatsResponse)
totalStats.IngestionRate += r.IngestionRate
Expand All @@ -1354,17 +1355,11 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
return totalStats, nil
}

// UserIDStats models ingestion statistics for one user, including the user ID
type UserIDStats struct {
UserID string `json:"userID"`
UserStats
}

// AllUserStats returns statistics about all users.
// Note it does not divide by the ReplicationFactor like UserStats()
func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, error) {
// Add up by user, across all responses from ingesters
perUserTotals := make(map[string]UserStats)
perUserTotals := make(map[string]ingester.UserStats)

req := &ingester_client.UserStatsRequest{}
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
Expand All @@ -1389,28 +1384,41 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
s.RuleIngestionRate += u.Data.RuleIngestionRate
s.NumSeries += u.Data.NumSeries
s.ActiveSeries += u.Data.ActiveSeries
s.LoadBlocks += u.Data.LoadBlocks
perUserTotals[u.UserId] = s
}
}

// Turn aggregated map into a slice for return
response := make([]UserIDStats, 0, len(perUserTotals))
response := make([]ingester.UserIDStats, 0, len(perUserTotals))
for id, stats := range perUserTotals {
response = append(response, UserIDStats{
response = append(response, ingester.UserIDStats{
UserID: id,
UserStats: UserStats{
UserStats: ingester.UserStats{
IngestionRate: stats.IngestionRate,
APIIngestionRate: stats.APIIngestionRate,
RuleIngestionRate: stats.RuleIngestionRate,
NumSeries: stats.NumSeries,
ActiveSeries: stats.ActiveSeries,
LoadBlocks: stats.LoadBlocks,
},
})
}

return response, nil
}

// AllUserStatsHandler shows stats for all users.
func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) {
stats, err := d.AllUserStats(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

ingester.AllUserStatsRender(w, r, stats, d.ingestersRing.ReplicationFactor())
}

func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if d.distributorsRing != nil {
d.distributorsRing.ServeHTTP(w, req)
Expand Down
9 changes: 0 additions & 9 deletions pkg/distributor/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@ import (
"github.com/cortexproject/cortex/pkg/util"
)

// UserStats models ingestion statistics for one user.
type UserStats struct {
IngestionRate float64 `json:"ingestionRate"`
NumSeries uint64 `json:"numSeries"`
APIIngestionRate float64 `json:"APIIngestionRate"`
RuleIngestionRate float64 `json:"RuleIngestionRate"`
ActiveSeries uint64 `json:"activeSeries"`
}

// UserStatsHandler handles user stats to the Distributor.
func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request) {
stats, err := d.UserStats(r.Context())
Expand Down
Loading
Loading