4
4
"context"
5
5
"fmt"
6
6
"sort"
7
+ "strings"
7
8
8
9
"github.com/pkg/errors"
9
10
"github.com/prometheus/prometheus/pkg/labels"
@@ -13,12 +14,14 @@ import (
13
14
"github.com/weaveworks/common/user"
14
15
15
16
"github.com/cortexproject/cortex/pkg/tenant"
17
+ "github.com/cortexproject/cortex/pkg/util/concurrency"
16
18
)
17
19
18
20
const (
19
21
defaultTenantLabel = "__tenant_id__"
20
22
retainExistingPrefix = "original_"
21
23
originalDefaultTenantLabel = retainExistingPrefix + defaultTenantLabel
24
+ maxConcurrency = 16
22
25
)
23
26
24
27
// NewQueryable returns a queryable that iterates through all the tenant IDs
@@ -65,6 +68,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
65
68
}
66
69
67
70
return & mergeQuerier {
71
+ ctx : ctx ,
68
72
queriers : queriers ,
69
73
tenantIDs : tenantIDs ,
70
74
}, nil
@@ -77,6 +81,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
77
81
// overwritten by the tenant ID and the previous value is exposed through a new
78
82
// label prefixed with "original_". This behaviour is not implemented recursively
79
83
type mergeQuerier struct {
84
+ ctx context.Context
80
85
queriers []storage.Querier
81
86
tenantIDs []string
82
87
}
@@ -97,7 +102,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
97
102
name = defaultTenantLabel
98
103
}
99
104
100
- return m .mergeDistinctStringSlice (func (q storage.Querier ) ([]string , storage.Warnings , error ) {
105
+ return m .mergeDistinctStringSlice (func (ctx context. Context , q storage.Querier ) ([]string , storage.Warnings , error ) {
101
106
return q .LabelValues (name , matchers ... )
102
107
})
103
108
}
@@ -106,7 +111,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
106
111
// queriers. It also adds the defaultTenantLabel and if present in the original
107
112
// results the originalDefaultTenantLabel
108
113
func (m * mergeQuerier ) LabelNames () ([]string , storage.Warnings , error ) {
109
- labelNames , warnings , err := m .mergeDistinctStringSlice (func (q storage.Querier ) ([]string , storage.Warnings , error ) {
114
+ labelNames , warnings , err := m .mergeDistinctStringSlice (func (ctx context. Context , q storage.Querier ) ([]string , storage.Warnings , error ) {
110
115
return q .LabelNames ()
111
116
})
112
117
if err != nil {
@@ -137,27 +142,64 @@ func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) {
137
142
return labelNames , warnings , nil
138
143
}
139
144
140
- type stringSliceFunc func (storage.Querier ) ([]string , storage.Warnings , error )
145
+ type stringSliceFunc func (context.Context , storage.Querier ) ([]string , storage.Warnings , error )
146
+
147
+ type stringSliceFuncJob struct {
148
+ querier storage.Querier
149
+ tenantID string
150
+ result []string
151
+ warnings storage.Warnings
152
+ }
141
153
142
154
// mergeDistinctStringSlice is aggregating results from stringSliceFunc calls
143
- // on a querier. It removes duplicates and sorts the result. It doesn't require
144
- // the output of the stringSliceFunc to be sorted, as results of LabelValues
145
- // are not sorted.
146
- //
147
- // TODO: Consider running stringSliceFunc calls concurrently
155
+ // on per querier in parallel. It removes duplicates and sorts the result. It
156
+ // doesn't require the output of the stringSliceFunc to be sorted, as results
157
+ // of LabelValues are not sorted.
148
158
func (m * mergeQuerier ) mergeDistinctStringSlice (f stringSliceFunc ) ([]string , storage.Warnings , error ) {
159
+ var jobs = make ([]interface {}, len (m .tenantIDs ))
160
+
161
+ for pos := range m .tenantIDs {
162
+ jobs [pos ] = & stringSliceFuncJob {
163
+ querier : m .queriers [pos ],
164
+ tenantID : m .tenantIDs [pos ],
165
+ }
166
+ }
167
+
168
+ run := func (ctx context.Context , jobIntf interface {}) error {
169
+ job , ok := jobIntf .(* stringSliceFuncJob )
170
+ if ! ok {
171
+ return fmt .Errorf ("unexpected type %T" , jobIntf )
172
+ }
173
+
174
+ var err error
175
+ job .result , job .warnings , err = f (ctx , job .querier )
176
+ if err != nil {
177
+ return errors .Wrapf (err , "error querying %s %s" , rewriteLabelName (defaultTenantLabel ), job .tenantID )
178
+ }
179
+
180
+ return nil
181
+ }
182
+
183
+ err := concurrency .ForEach (m .ctx , jobs , maxConcurrency , run )
184
+ if err != nil {
185
+ return nil , nil , err
186
+ }
187
+
188
+ // aggregate warnings and deduplicate string results
149
189
var warnings storage.Warnings
150
190
resultMap := make (map [string ]struct {})
151
- for pos , tenantID := range m . tenantIDs {
152
- result , resultWarnings , err := f ( m . queriers [ pos ] )
153
- if err != nil {
154
- return nil , nil , err
191
+ for _ , jobIntf := range jobs {
192
+ job , ok := jobIntf .( * stringSliceFuncJob )
193
+ if ! ok {
194
+ return nil , nil , fmt . Errorf ( "unexpected type %T" , jobIntf )
155
195
}
156
- for _ , e := range result {
196
+
197
+ for _ , e := range job .result {
157
198
resultMap [e ] = struct {}{}
158
199
}
159
- for _ , w := range resultWarnings {
160
- warnings = append (warnings , fmt .Errorf ("error querying tenant id %s: %w" , tenantID , w ))
200
+
201
+ for _ , w := range job .warnings {
202
+ warnings = append (warnings , errors .Wrapf (w , "warning querying %s %s" , rewriteLabelName (defaultTenantLabel ), job .tenantID ))
161
203
}
162
204
}
163
205
@@ -173,33 +215,60 @@ func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, st
173
215
func (m * mergeQuerier ) Close () error {
174
216
errs := tsdb_errors .NewMulti ()
175
217
for pos , tenantID := range m .tenantIDs {
176
- errs .Add (errors .Wrapf (m .queriers [pos ].Close (), "failed to close querier for tenant id %s" , tenantID ))
218
+ errs .Add (errors .Wrapf (m .queriers [pos ].Close (), "failed to close querier for %s %s" , rewriteLabelName ( defaultTenantLabel ) , tenantID ))
177
219
}
178
220
return errs .Err ()
179
221
}
180
222
223
+ type selectJob struct {
224
+ pos int
225
+ querier storage.Querier
226
+ tenantID string
227
+ }
228
+
181
229
// Select returns a set of series that matches the given label matchers. If the
182
230
// tenantLabelName is matched on it only considers those queriers matching. The
183
231
// forwarded labelSelector is not containing those that operate on
184
232
// tenantLabelName.
185
233
func (m * mergeQuerier ) Select (sortSeries bool , hints * storage.SelectHints , matchers ... * labels.Matcher ) storage.SeriesSet {
186
234
matchedTenants , filteredMatchers := filterValuesByMatchers (defaultTenantLabel , m .tenantIDs , matchers ... )
187
- var seriesSets = make ([]storage.SeriesSet , 0 , len (matchedTenants ))
188
- for pos , tenantID := range m .tenantIDs {
189
- if _ , matched := matchedTenants [tenantID ]; ! matched {
235
+ var jobs = make ([]interface {}, len (matchedTenants ))
236
+ var seriesSets = make ([]storage.SeriesSet , len (matchedTenants ))
237
+ var jobPos int
238
+ for tenantPos := range m .tenantIDs {
239
+ if _ , matched := matchedTenants [m.tenantIDs [tenantPos ]]; ! matched {
190
240
continue
191
241
}
192
- seriesSets = append (seriesSets , & addLabelsSeriesSet {
193
- // TODO: Consider running Select calls concurrently
194
- upstream : m .queriers [pos ].Select (sortSeries , hints , filteredMatchers ... ),
242
+ jobs [jobPos ] = & selectJob {
243
+ pos : jobPos ,
244
+ querier : m .queriers [tenantPos ],
245
+ tenantID : m .tenantIDs [tenantPos ],
246
+ }
247
+ jobPos ++
248
+ }
249
+
250
+ run := func (ctx context.Context , jobIntf interface {}) error {
251
+ job , ok := jobIntf .(* selectJob )
252
+ if ! ok {
253
+ return fmt .Errorf ("unexpected type %T" , jobIntf )
254
+ }
255
+ seriesSets [job .pos ] = & addLabelsSeriesSet {
256
+ upstream : job .querier .Select (sortSeries , hints , filteredMatchers ... ),
195
257
labels : labels.Labels {
196
258
{
197
259
Name : defaultTenantLabel ,
198
- Value : tenantID ,
260
+ Value : job . tenantID ,
199
261
},
200
262
},
201
- })
263
+ }
264
+ return nil
202
265
}
266
+
267
+ err := concurrency .ForEach (m .ctx , jobs , maxConcurrency , run )
268
+ if err != nil {
269
+ return storage .ErrSeriesSet (err )
270
+ }
271
+
203
272
return storage .NewMergeSeriesSet (seriesSets , storage .ChainedSeriesMerge )
204
273
}
205
274
@@ -266,13 +335,32 @@ func (m *addLabelsSeriesSet) At() storage.Series {
266
335
// The error that iteration as failed with.
267
336
// When an error occurs, set cannot continue to iterate.
268
337
func (m * addLabelsSeriesSet ) Err () error {
269
- return m .upstream .Err ()
338
+ return errors . Wrapf ( m .upstream .Err (), "error querying %s" , labelsToString ( m . labels ) )
270
339
}
271
340
272
341
// A collection of warnings for the whole set.
273
342
// Warnings could be return even iteration has not failed with error.
274
343
func (m * addLabelsSeriesSet ) Warnings () storage.Warnings {
275
- return m .upstream .Warnings ()
344
+ upstream := m .upstream .Warnings ()
345
+ warnings := make (storage.Warnings , len (upstream ))
346
+ for pos := range upstream {
347
+ warnings [pos ] = errors .Wrapf (upstream [pos ], "warning querying %s" , labelsToString (m .labels ))
348
+ }
349
+ return warnings
350
+ }
351
+
352
+ // rewrite label name to be more readable in error output
353
+ func rewriteLabelName (s string ) string {
354
+ return strings .TrimRight (strings .TrimLeft (s , "_" ), "_" )
355
+ }
356
+
357
+ // this outputs a more readable error format
358
+ func labelsToString (labels labels.Labels ) string {
359
+ parts := make ([]string , len (labels ))
360
+ for pos , l := range labels {
361
+ parts [pos ] = rewriteLabelName (l .Name ) + " " + l .Value
362
+ }
363
+ return strings .Join (parts , ", " )
276
364
}
277
365
278
366
type addLabelsSeries struct {
0 commit comments