Skip to content

Commit 9bd12de

Browse files
committed
add context cancellation checks on GetSeries
Signed-off-by: Erlan Zholdubai uulu <[email protected]>
1 parent 4b75a5c commit 9bd12de

File tree

3 files changed

+17
-5
lines changed

3 files changed

+17
-5
lines changed

pkg/querier/distributor_queryable.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,12 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st
136136
if err != nil {
137137
return storage.ErrSeriesSet(err)
138138
}
139-
return series.MetricsToSeriesSet(sortSeries, ms)
139+
seriesSet, err := series.MetricsToSeriesSet(ctx, sortSeries, ms)
140+
if err != nil {
141+
return storage.ErrSeriesSet(err)
142+
}
143+
144+
return seriesSet
140145
}
141146

142147
return q.streamingSelect(ctx, sortSeries, minT, maxT, matchers)

pkg/querier/querier.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select
437437
// we have all the sets from different sources (chunk from store, chunks from ingesters,
438438
// time series from store and time series from ingesters).
439439
// mergeSeriesSets will return sorted set.
440-
return q.mergeSeriesSets(result)
440+
return q.mergeSeriesSets(ctx, result)
441441
}
442442

443443
// LabelValues implements storage.Querier.
@@ -553,7 +553,7 @@ func (querier) Close() error {
553553
return nil
554554
}
555555

556-
func (q querier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesSet {
556+
func (q querier) mergeSeriesSets(ctx context.Context, sets []storage.SeriesSet) storage.SeriesSet {
557557
// Here we deal with sets that are based on chunks and build single set from them.
558558
// Remaining sets are merged with chunks-based one using storage.NewMergeSeriesSet
559559

@@ -565,6 +565,9 @@ func (q querier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesSet {
565565

566566
// SeriesSet may have some series backed up by chunks, and some not.
567567
for set.Next() {
568+
if ctx.Err() != nil {
569+
return storage.ErrSeriesSet(ctx.Err())
570+
}
568571
s := set.At()
569572

570573
if sc, ok := s.(SeriesWithChunks); ok {

pkg/querier/series/series_set.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package series
1818

1919
import (
20+
"context"
2021
"sort"
2122

2223
"github.com/prometheus/common/model"
@@ -167,15 +168,18 @@ func MatrixToSeriesSet(sortSeries bool, m model.Matrix) storage.SeriesSet {
167168
}
168169

169170
// MetricsToSeriesSet creates a storage.SeriesSet from a []metric.Metric
170-
func MetricsToSeriesSet(sortSeries bool, ms []metric.Metric) storage.SeriesSet {
171+
func MetricsToSeriesSet(ctx context.Context, sortSeries bool, ms []metric.Metric) (storage.SeriesSet, error) {
171172
series := make([]storage.Series, 0, len(ms))
172173
for _, m := range ms {
174+
if ctx.Err() != nil {
175+
return nil, ctx.Err()
176+
}
173177
series = append(series, &ConcreteSeries{
174178
labels: metricToLabels(m.Metric),
175179
samples: nil,
176180
})
177181
}
178-
return NewConcreteSeriesSet(sortSeries, series)
182+
return NewConcreteSeriesSet(sortSeries, series), nil
179183
}
180184

181185
func metricToLabels(m model.Metric) labels.Labels {

0 commit comments

Comments
 (0)