-
Notifications
You must be signed in to change notification settings - Fork 817
/
Copy pathchunk_store_queryable.go
108 lines (90 loc) · 3.22 KB
/
chunk_store_queryable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package querier
import (
"context"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/weaveworks/common/user"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier/chunkstore"
seriesset "github.com/cortexproject/cortex/pkg/querier/series"
)
type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator
func newChunkStoreQueryable(store chunkstore.ChunkStore, chunkIteratorFunc chunkIteratorFunc) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &chunkStoreQuerier{
store: store,
chunkIteratorFunc: chunkIteratorFunc,
ctx: ctx,
mint: mint,
maxt: maxt,
}, nil
})
}
type chunkStoreQuerier struct {
store chunkstore.ChunkStore
chunkIteratorFunc chunkIteratorFunc
ctx context.Context
mint, maxt int64
}
// Select implements storage.Querier interface.
// The bool passed is ignored because the series is always sorted.
func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
userID, err := user.ExtractOrgID(q.ctx)
if err != nil {
return storage.ErrSeriesSet(err)
}
chunks, err := q.store.Get(q.ctx, userID, model.Time(sp.Start), model.Time(sp.End), matchers...)
if err != nil {
return storage.ErrSeriesSet(err)
}
return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc)
}
// Series in the returned set are sorted alphabetically by labels.
func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet {
chunksBySeries := map[model.Fingerprint][]chunk.Chunk{}
for _, c := range chunks {
fp := client.Fingerprint(c.Metric)
chunksBySeries[fp] = append(chunksBySeries[fp], c)
}
series := make([]storage.Series, 0, len(chunksBySeries))
for i := range chunksBySeries {
series = append(series, &chunkSeries{
labels: chunksBySeries[i][0].Metric,
chunks: chunksBySeries[i],
chunkIteratorFunc: iteratorFunc,
mint: mint,
maxt: maxt,
})
}
return seriesset.NewConcreteSeriesSet(series)
}
func (q *chunkStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (q *chunkStoreQuerier) LabelNames() ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (q *chunkStoreQuerier) Close() error {
return nil
}
// Implements SeriesWithChunks
type chunkSeries struct {
labels labels.Labels
chunks []chunk.Chunk
chunkIteratorFunc chunkIteratorFunc
mint, maxt int64
}
func (s *chunkSeries) Labels() labels.Labels {
return s.labels
}
// Iterator returns a new iterator of the data of the series.
func (s *chunkSeries) Iterator() chunkenc.Iterator {
return s.chunkIteratorFunc(s.chunks, model.Time(s.mint), model.Time(s.maxt))
}
// Chunks implements SeriesWithChunks interface.
func (s *chunkSeries) Chunks() []chunk.Chunk {
return s.chunks
}