Skip to content

Commit 952dd9f

Browse files
committed
Support metadata federated query
Signed-off-by: SungJin1212 <[email protected]>
1 parent 8a46d20 commit 952dd9f

File tree

7 files changed

+275
-6
lines changed

7 files changed

+275
-6
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
1919
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
2020
* [FEATURE] Ruler: Add support for per-user external labels #6340
21+
* [FEATURE] Query Frontend: Support a metadata federated query when `-tenant-federation.enabled=true`. #6461
2122
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
2223
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
2324
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388

pkg/api/handlers.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func NewQuerierHandler(
162162
queryable storage.SampleAndChunkQueryable,
163163
exemplarQueryable storage.ExemplarQueryable,
164164
engine promql.QueryEngine,
165-
distributor Distributor,
165+
metadataQuerier querier.MetadataQuerier,
166166
reg prometheus.Registerer,
167167
logger log.Logger,
168168
) http.Handler {
@@ -266,7 +266,7 @@ func NewQuerierHandler(
266266

267267
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
268268
// https://github.com/prometheus/prometheus/pull/7125/files
269-
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(distributor))
269+
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
270270
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
271271
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
272272
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
@@ -279,7 +279,7 @@ func NewQuerierHandler(
279279

280280
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
281281
// https://github.com/prometheus/prometheus/pull/7125/files
282-
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(distributor))
282+
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
283283
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
284284
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
285285
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter)

pkg/cortex/cortex.go

+1
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ type Cortex struct {
311311
RuntimeConfig *runtimeconfig.Manager
312312
QuerierQueryable prom_storage.SampleAndChunkQueryable
313313
ExemplarQueryable prom_storage.ExemplarQueryable
314+
MetadataQuerier querier.MetadataQuerier
314315
QuerierEngine promql.QueryEngine
315316
QueryFrontendTripperware tripperware.Tripperware
316317

pkg/cortex/modules.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,9 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
260260
// Create a querier queryable and PromQL engine
261261
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger)
262262

263+
// Use distributor as default MetadataQuerier
264+
t.MetadataQuerier = t.Distributor
265+
263266
// Register the default endpoints that are always enabled for the querier module
264267
t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor)
265268

@@ -274,6 +277,8 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
274277
// federation.
275278
byPassForSingleQuerier := true
276279
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
280+
t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer)
281+
277282
}
278283
return nil, nil
279284
}
@@ -335,7 +340,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
335340
t.QuerierQueryable,
336341
t.ExemplarQueryable,
337342
t.QuerierEngine,
338-
t.Distributor,
343+
t.MetadataQuerier,
339344
prometheus.DefaultRegisterer,
340345
util_log.Logger,
341346
)

pkg/querier/metadata_handler.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
package querier
22

33
import (
4+
"context"
45
"net/http"
56

7+
"github.com/prometheus/prometheus/scrape"
8+
69
"github.com/cortexproject/cortex/pkg/util"
710
)
811

12+
type MetadataQuerier interface {
13+
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
14+
}
15+
916
type metricMetadata struct {
1017
Type string `json:"type"`
1118
Help string `json:"help"`
@@ -25,9 +32,9 @@ type metadataResult struct {
2532

2633
// MetadataHandler returns metric metadata held by Cortex for a given tenant.
2734
// It is kept and returned as a set.
28-
func MetadataHandler(d Distributor) http.Handler {
35+
func MetadataHandler(m MetadataQuerier) http.Handler {
2936
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
30-
resp, err := d.MetricsMetadata(r.Context())
37+
resp, err := m.MetricsMetadata(r.Context())
3138
if err != nil {
3239
w.WriteHeader(http.StatusBadRequest)
3340
util.WriteJSONResponse(w, metadataResult{Status: statusError, Error: err.Error()})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package tenantfederation
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/pkg/errors"
8+
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/client_golang/prometheus/promauto"
10+
"github.com/prometheus/prometheus/scrape"
11+
"github.com/weaveworks/common/user"
12+
13+
"github.com/cortexproject/cortex/pkg/querier"
14+
"github.com/cortexproject/cortex/pkg/tenant"
15+
"github.com/cortexproject/cortex/pkg/util/concurrency"
16+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
17+
)
18+
19+
// NewMetadataQuerier returns a MetadataQuerier that merges metric
20+
// metadata for multiple tenants.
21+
func NewMetadataQuerier(upstream querier.MetadataQuerier, maxConcurrent int, reg prometheus.Registerer) querier.MetadataQuerier {
22+
return &mergeMetadataQuerier{
23+
upstream: upstream,
24+
maxConcurrent: maxConcurrent,
25+
26+
tenantsPerMetadataQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
27+
Namespace: "cortex",
28+
Name: "querier_federated_tenants_per_metadata_query",
29+
Help: "Number of tenants per metadata query.",
30+
Buckets: []float64{1, 2, 4, 8, 16, 32, 64},
31+
}),
32+
}
33+
}
34+
35+
type mergeMetadataQuerier struct {
36+
maxConcurrent int
37+
tenantsPerMetadataQuery prometheus.Histogram
38+
upstream querier.MetadataQuerier
39+
}
40+
41+
type metadataSelectJob struct {
42+
pos int
43+
querier querier.MetadataQuerier
44+
id string
45+
}
46+
47+
// MetricsMetadata returns aggregated metadata for multiple tenants
48+
func (m *mergeMetadataQuerier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
49+
log, ctx := spanlogger.New(ctx, "mergeMetadataQuerier.MetricsMetadata")
50+
defer log.Span.Finish()
51+
52+
tenantIds, err := tenant.TenantIDs(ctx)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
m.tenantsPerMetadataQuery.Observe(float64(len(tenantIds)))
58+
59+
if len(tenantIds) == 1 {
60+
return m.upstream.MetricsMetadata(ctx)
61+
}
62+
63+
jobs := make([]interface{}, len(tenantIds))
64+
results := make([][]scrape.MetricMetadata, len(tenantIds))
65+
66+
var jobPos int
67+
for _, tenantId := range tenantIds {
68+
jobs[jobPos] = &metadataSelectJob{
69+
pos: jobPos,
70+
querier: m.upstream,
71+
id: tenantId,
72+
}
73+
jobPos++
74+
}
75+
76+
run := func(ctx context.Context, jobIntf interface{}) error {
77+
job, ok := jobIntf.(*metadataSelectJob)
78+
if !ok {
79+
return fmt.Errorf("unexpected type %T", jobIntf)
80+
}
81+
82+
res, err := job.querier.MetricsMetadata(user.InjectOrgID(ctx, job.id))
83+
if err != nil {
84+
return errors.Wrapf(err, "error exemplars querying %s %s", job.id, err)
85+
}
86+
87+
results[job.pos] = res
88+
return nil
89+
}
90+
91+
err = concurrency.ForEach(ctx, jobs, m.maxConcurrent, run)
92+
if err != nil {
93+
return nil, err
94+
}
95+
96+
// deduplicate for the same MetricMetadata across all tenants
97+
var ret []scrape.MetricMetadata
98+
deduplicated := make(map[scrape.MetricMetadata]struct{})
99+
for _, metadata := range results {
100+
for _, m := range metadata {
101+
if _, ok := deduplicated[m]; !ok {
102+
ret = append(ret, m)
103+
deduplicated[m] = struct{}{}
104+
}
105+
}
106+
}
107+
108+
return ret, nil
109+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package tenantfederation
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"testing"
8+
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/testutil"
11+
"github.com/prometheus/prometheus/scrape"
12+
"github.com/stretchr/testify/require"
13+
"github.com/weaveworks/common/user"
14+
15+
"github.com/cortexproject/cortex/pkg/tenant"
16+
)
17+
18+
var (
19+
expectedSingleTenantsMetadataMetrics = `
20+
# HELP cortex_querier_federated_tenants_per_metadata_query Number of tenants per metadata query.
21+
# TYPE cortex_querier_federated_tenants_per_metadata_query histogram
22+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="1"} 1
23+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="2"} 1
24+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="4"} 1
25+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="8"} 1
26+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="16"} 1
27+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="32"} 1
28+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="64"} 1
29+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="+Inf"} 1
30+
cortex_querier_federated_tenants_per_metadata_query_sum 1
31+
cortex_querier_federated_tenants_per_metadata_query_count 1
32+
`
33+
34+
expectedTwoTenantsMetadataMetrics = `
35+
# HELP cortex_querier_federated_tenants_per_metadata_query Number of tenants per metadata query.
36+
# TYPE cortex_querier_federated_tenants_per_metadata_query histogram
37+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="1"} 0
38+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="2"} 1
39+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="4"} 1
40+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="8"} 1
41+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="16"} 1
42+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="32"} 1
43+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="64"} 1
44+
cortex_querier_federated_tenants_per_metadata_query_bucket{le="+Inf"} 1
45+
cortex_querier_federated_tenants_per_metadata_query_sum 2
46+
cortex_querier_federated_tenants_per_metadata_query_count 1
47+
`
48+
)
49+
50+
type mockMetadataQuerier struct {
51+
tenantIdToMetadata map[string][]scrape.MetricMetadata
52+
}
53+
54+
func (m *mockMetadataQuerier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
55+
// Due to lint check for `ensure the query path is supporting multiple tenants`
56+
ids, err := tenant.TenantIDs(ctx)
57+
if err != nil {
58+
return nil, err
59+
}
60+
61+
id := ids[0]
62+
if res, ok := m.tenantIdToMetadata[id]; !ok {
63+
return nil, fmt.Errorf("tenant not found, tenantId: %s", id)
64+
} else {
65+
return res, nil
66+
}
67+
}
68+
69+
func Test_mergeMetadataQuerier_MetricsMetadata(t *testing.T) {
70+
// set a multi tenant resolver
71+
tenant.WithDefaultResolver(tenant.NewMultiResolver())
72+
73+
tests := []struct {
74+
name string
75+
tenantIdToMetadata map[string][]scrape.MetricMetadata
76+
orgId string
77+
expectedResults []scrape.MetricMetadata
78+
expectedMetrics string
79+
}{
80+
{
81+
name: "single tenant",
82+
tenantIdToMetadata: map[string][]scrape.MetricMetadata{
83+
"user-1": {
84+
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
85+
},
86+
},
87+
orgId: "user-1",
88+
expectedResults: []scrape.MetricMetadata{
89+
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
90+
},
91+
expectedMetrics: expectedSingleTenantsMetadataMetrics,
92+
},
93+
{
94+
name: "should be merged two tenants results",
95+
tenantIdToMetadata: map[string][]scrape.MetricMetadata{
96+
"user-1": {
97+
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
98+
},
99+
"user-2": {
100+
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
101+
{Metric: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""},
102+
},
103+
},
104+
orgId: "user-1|user-2",
105+
expectedResults: []scrape.MetricMetadata{
106+
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
107+
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
108+
{Metric: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""},
109+
},
110+
expectedMetrics: expectedTwoTenantsMetadataMetrics,
111+
},
112+
{
113+
name: "should be deduplicated when the same metadata exist",
114+
tenantIdToMetadata: map[string][]scrape.MetricMetadata{
115+
"user-1": {
116+
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
117+
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
118+
},
119+
"user-2": {
120+
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
121+
},
122+
},
123+
orgId: "user-1|user-2",
124+
expectedResults: []scrape.MetricMetadata{
125+
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
126+
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
127+
},
128+
expectedMetrics: expectedTwoTenantsMetadataMetrics,
129+
},
130+
}
131+
132+
for _, test := range tests {
133+
t.Run(test.name, func(t *testing.T) {
134+
reg := prometheus.NewPedanticRegistry()
135+
upstream := mockMetadataQuerier{
136+
tenantIdToMetadata: test.tenantIdToMetadata,
137+
}
138+
139+
mergeMetadataQuerier := NewMetadataQuerier(&upstream, defaultMaxConcurrency, reg)
140+
metadata, err := mergeMetadataQuerier.MetricsMetadata(user.InjectOrgID(context.Background(), test.orgId))
141+
require.NoError(t, err)
142+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_metadata_query"))
143+
require.Equal(t, test.expectedResults, metadata)
144+
})
145+
}
146+
}

0 commit comments

Comments
 (0)