Skip to content

Commit 329e726

Browse files
committed
Add streaming for ingester metadata APIs
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent d5ce6f4 commit 329e726

17 files changed

+1679
-250
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction. #4707
1515
* [FEATURE] Querier/Query-Frontend: Add `-querier.per-step-stats-enabled` and `-frontend.cache-queryable-samples-stats` configurations to enable query sample statistics. #4708
1616
* [FEATURE] Add shuffle sharding for the compactor #4433
17+
* [FEATURE] Querier: Use streaming for ingester metdata APIs. #4725
1718
* [ENHANCEMENT] Update Go version to 1.17.8. #4602 #4604 #4658
1819
* [ENHANCEMENT] Keep track of discarded samples due to bad relabel configuration in `cortex_discarded_samples_total`. #4503
1920
* [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571

docs/blocks-storage/querier.md

+4
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ querier:
118118
# CLI flag: -querier.ingester-streaming
119119
[ingester_streaming: <boolean> | default = true]
120120

121+
# Use streaming RPCs for metadata APIs from ingester.
122+
# CLI flag: -querier.ingester-metadata-streaming
123+
[ingester_metadata_streaming: <boolean> | default = false]
124+
121125
# Maximum number of samples a single query can load into memory.
122126
# CLI flag: -querier.max-samples
123127
[max_samples: <int> | default = 50000000]

docs/configuration/config-file-reference.md

+4
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,10 @@ The `querier_config` configures the Cortex querier.
871871
# CLI flag: -querier.ingester-streaming
872872
[ingester_streaming: <boolean> | default = true]
873873
874+
# Use streaming RPCs for metadata APIs from ingester.
875+
# CLI flag: -querier.ingester-metadata-streaming
876+
[ingester_metadata_streaming: <boolean> | default = false]
877+
874878
# Maximum number of samples a single query can load into memory.
875879
# CLI flag: -querier.max-samples
876880
[max_samples: <int> | default = 50000000]

pkg/distributor/distributor.go

+145-29
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
io "io"
78
"net/http"
89
"sort"
910
"strings"
@@ -871,8 +872,7 @@ func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring
871872
})
872873
}
873874

874-
// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
875-
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
875+
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
876876
replicationSet, err := d.GetIngestersForMetadata(ctx)
877877
if err != nil {
878878
return nil, err
@@ -883,16 +883,14 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
883883
return nil, err
884884
}
885885

886-
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
887-
return client.LabelValues(ctx, req)
888-
})
886+
resps, err := f(ctx, replicationSet, req)
889887
if err != nil {
890888
return nil, err
891889
}
892890

893891
valueSet := map[string]struct{}{}
894892
for _, resp := range resps {
895-
for _, v := range resp.(*ingester_client.LabelValuesResponse).LabelValues {
893+
for _, v := range resp.([]string) {
896894
valueSet[v] = struct{}{}
897895
}
898896
}
@@ -908,8 +906,46 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
908906
return values, nil
909907
}
910908

911-
// LabelNames returns all of the label names.
912-
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
909+
// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
910+
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
911+
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
912+
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
913+
resp, err := client.LabelValues(ctx, req)
914+
if err != nil {
915+
return nil, err
916+
}
917+
return resp.LabelValues, nil
918+
})
919+
}, matchers...)
920+
}
921+
922+
// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
923+
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
924+
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
925+
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
926+
stream, err := client.LabelValuesStream(ctx, req)
927+
if err != nil {
928+
return nil, err
929+
}
930+
defer stream.CloseSend() //nolint:errcheck
931+
allLabelValues := []string{}
932+
for {
933+
resp, err := stream.Recv()
934+
935+
if err == io.EOF {
936+
break
937+
} else if err != nil {
938+
return nil, err
939+
}
940+
allLabelValues = append(allLabelValues, resp.LabelValues...)
941+
}
942+
943+
return allLabelValues, nil
944+
})
945+
}, matchers...)
946+
}
947+
948+
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
913949
replicationSet, err := d.GetIngestersForMetadata(ctx)
914950
if err != nil {
915951
return nil, err
@@ -919,16 +955,14 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
919955
StartTimestampMs: int64(from),
920956
EndTimestampMs: int64(to),
921957
}
922-
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
923-
return client.LabelNames(ctx, req)
924-
})
958+
resps, err := f(ctx, replicationSet, req)
925959
if err != nil {
926960
return nil, err
927961
}
928962

929963
valueSet := map[string]struct{}{}
930964
for _, resp := range resps {
931-
for _, v := range resp.(*ingester_client.LabelNamesResponse).LabelNames {
965+
for _, v := range resp.([]string) {
932966
valueSet[v] = struct{}{}
933967
}
934968
}
@@ -943,8 +977,106 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
943977
return values, nil
944978
}
945979

980+
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) {
981+
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
982+
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
983+
stream, err := client.LabelNamesStream(ctx, req)
984+
if err != nil {
985+
return nil, err
986+
}
987+
defer stream.CloseSend() //nolint:errcheck
988+
allLabelNames := []string{}
989+
for {
990+
resp, err := stream.Recv()
991+
992+
if err == io.EOF {
993+
break
994+
} else if err != nil {
995+
return nil, err
996+
}
997+
allLabelNames = append(allLabelNames, resp.LabelNames...)
998+
}
999+
1000+
return allLabelNames, nil
1001+
})
1002+
})
1003+
}
1004+
1005+
// LabelNames returns all of the label names.
1006+
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
1007+
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1008+
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1009+
resp, err := client.LabelNames(ctx, req)
1010+
if err != nil {
1011+
return nil, err
1012+
}
1013+
return resp.LabelNames, nil
1014+
})
1015+
})
1016+
}
1017+
9461018
// MetricsForLabelMatchers gets the metrics that match said matchers
9471019
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
1020+
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1021+
_, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1022+
resp, err := client.MetricsForLabelMatchers(ctx, req)
1023+
if err != nil {
1024+
return nil, err
1025+
}
1026+
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp)
1027+
for _, m := range ms {
1028+
if err := queryLimiter.AddSeries(cortexpb.FromMetricsToLabelAdapters(m)); err != nil {
1029+
return nil, err
1030+
}
1031+
mutex.Lock()
1032+
(*metrics)[m.Fingerprint()] = m
1033+
mutex.Unlock()
1034+
}
1035+
1036+
return nil, nil
1037+
})
1038+
1039+
return err
1040+
}, matchers...)
1041+
}
1042+
1043+
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
1044+
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1045+
_, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1046+
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
1047+
if err != nil {
1048+
return nil, err
1049+
}
1050+
defer stream.CloseSend() //nolint:errcheck
1051+
for {
1052+
resp, err := stream.Recv()
1053+
1054+
if err == io.EOF {
1055+
break
1056+
} else if err != nil {
1057+
return nil, err
1058+
}
1059+
1060+
for _, metric := range resp.Metric {
1061+
m := cortexpb.FromLabelAdaptersToMetric(metric.Labels)
1062+
1063+
if err := queryLimiter.AddSeries(cortexpb.FromMetricsToLabelAdapters(m)); err != nil {
1064+
return nil, err
1065+
}
1066+
mutex.Lock()
1067+
(*metrics)[m.Fingerprint()] = m
1068+
mutex.Unlock()
1069+
}
1070+
}
1071+
1072+
return nil, nil
1073+
})
1074+
1075+
return err
1076+
}, matchers...)
1077+
}
1078+
1079+
func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]model.Metric, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]metric.Metric, error) {
9481080
replicationSet, err := d.GetIngestersForMetadata(ctx)
9491081
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
9501082
if err != nil {
@@ -958,23 +1090,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
9581090
mutex := sync.Mutex{}
9591091
metrics := map[model.Fingerprint]model.Metric{}
9601092

961-
_, err = d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
962-
resp, err := client.MetricsForLabelMatchers(ctx, req)
963-
if err != nil {
964-
return nil, err
965-
}
966-
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp)
967-
for _, m := range ms {
968-
if err := queryLimiter.AddSeries(cortexpb.FromMetricsToLabelAdapters(m)); err != nil {
969-
return nil, err
970-
}
971-
mutex.Lock()
972-
metrics[m.Fingerprint()] = m
973-
mutex.Unlock()
974-
}
975-
976-
return resp, nil
977-
})
1093+
err = f(ctx, replicationSet, req, &metrics, &mutex, queryLimiter)
9781094

9791095
if err != nil {
9801096
return nil, err

pkg/distributor/distributor_test.go

+81-15
Original file line numberDiff line numberDiff line change
@@ -2017,21 +2017,36 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
20172017
require.NoError(t, err)
20182018
}
20192019

2020-
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)
2020+
{
2021+
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)
20212022

2022-
if testData.expectedErr != nil {
2023-
assert.EqualError(t, err, testData.expectedErr.Error())
2024-
return
2023+
if testData.expectedErr != nil {
2024+
assert.EqualError(t, err, testData.expectedErr.Error())
2025+
return
2026+
}
2027+
2028+
require.NoError(t, err)
2029+
assert.ElementsMatch(t, testData.expectedResult, metrics)
2030+
2031+
// Check how many ingesters have been queried.
2032+
// Due to the quorum the distributor could cancel the last request towards ingesters
2033+
// if all other ones are successful, so we're good either has been queried X or X-1
2034+
// ingesters.
2035+
assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchers"))
20252036
}
20262037

2027-
require.NoError(t, err)
2028-
assert.ElementsMatch(t, testData.expectedResult, metrics)
2038+
{
2039+
metrics, err := ds[0].MetricsForLabelMatchersStream(ctx, now, now, testData.matchers...)
2040+
if testData.expectedErr != nil {
2041+
assert.EqualError(t, err, testData.expectedErr.Error())
2042+
return
2043+
}
20292044

2030-
// Check how many ingesters have been queried.
2031-
// Due to the quorum the distributor could cancel the last request towards ingesters
2032-
// if all other ones are successful, so we're good either has been queried X or X-1
2033-
// ingesters.
2034-
assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchers"))
2045+
require.NoError(t, err)
2046+
assert.ElementsMatch(t, testData.expectedResult, metrics)
2047+
2048+
assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchersStream"))
2049+
}
20352050
})
20362051
}
20372052
}
@@ -2661,7 +2676,39 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest
26612676
},
26622677
})
26632678
}
2664-
return &stream{
2679+
return &queryStream{
2680+
results: results,
2681+
}, nil
2682+
}
2683+
2684+
func (i *mockIngester) MetricsForLabelMatchersStream(ctx context.Context, req *client.MetricsForLabelMatchersRequest, opts ...grpc.CallOption) (client.Ingester_MetricsForLabelMatchersStreamClient, error) {
2685+
time.Sleep(i.queryDelay)
2686+
i.Lock()
2687+
defer i.Unlock()
2688+
2689+
i.trackCall("MetricsForLabelMatchersStream")
2690+
2691+
if !i.happy.Load() {
2692+
return nil, errFail
2693+
}
2694+
2695+
_, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
2696+
if err != nil {
2697+
return nil, err
2698+
}
2699+
2700+
results := []*client.MetricsForLabelMatchersStreamResponse{}
2701+
for _, matchers := range multiMatchers {
2702+
for _, ts := range i.timeseries {
2703+
if match(ts.Labels, matchers) {
2704+
results = append(results, &client.MetricsForLabelMatchersStreamResponse{
2705+
Metric: []*cortexpb.Metric{{Labels: ts.Labels}},
2706+
})
2707+
}
2708+
}
2709+
}
2710+
2711+
return &metricsForLabelMatchersStream{
26652712
results: results,
26662713
}, nil
26672714
}
@@ -2742,17 +2789,36 @@ func (i *noopIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt
27422789
return nil, nil
27432790
}
27442791

2745-
type stream struct {
2792+
type queryStream struct {
27462793
grpc.ClientStream
27472794
i int
27482795
results []*client.QueryStreamResponse
27492796
}
27502797

2751-
func (*stream) CloseSend() error {
2798+
func (*queryStream) CloseSend() error {
2799+
return nil
2800+
}
2801+
2802+
func (s *queryStream) Recv() (*client.QueryStreamResponse, error) {
2803+
if s.i >= len(s.results) {
2804+
return nil, io.EOF
2805+
}
2806+
result := s.results[s.i]
2807+
s.i++
2808+
return result, nil
2809+
}
2810+
2811+
type metricsForLabelMatchersStream struct {
2812+
grpc.ClientStream
2813+
i int
2814+
results []*client.MetricsForLabelMatchersStreamResponse
2815+
}
2816+
2817+
func (*metricsForLabelMatchersStream) CloseSend() error {
27522818
return nil
27532819
}
27542820

2755-
func (s *stream) Recv() (*client.QueryStreamResponse, error) {
2821+
func (s *metricsForLabelMatchersStream) Recv() (*client.MetricsForLabelMatchersStreamResponse, error) {
27562822
if s.i >= len(s.results) {
27572823
return nil, io.EOF
27582824
}

0 commit comments

Comments
 (0)