@@ -41,32 +41,43 @@ func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool) stor
41
41
}
42
42
43
43
func tenantQuerierCallback (queryable storage.Queryable ) MergeQuerierCallback {
44
- return func (ctx context.Context , mint int64 , maxt int64 ) ([]string , []storage.Querier , error ) {
44
+ return func (ctx context.Context , mint int64 , maxt int64 ) ([]string , []storage.Querier , func (), error ) {
45
45
tenantIDs , err := tenant .TenantIDs (ctx )
46
46
if err != nil {
47
- return nil , nil , err
47
+ return nil , nil , nil , err
48
48
}
49
49
50
+ var spans = make ([]* spanlogger.SpanLogger , len (tenantIDs ))
51
+
50
52
var queriers = make ([]storage.Querier , len (tenantIDs ))
51
53
for pos , tenantID := range tenantIDs {
54
+ sctx := user .InjectOrgID (ctx , tenantID )
55
+ spans [pos ], sctx = spanlogger .New (sctx , "mergeQuerier.NewQuerier" )
56
+
52
57
q , err := queryable .Querier (
53
- user . InjectOrgID ( ctx , tenantID ) ,
58
+ sctx ,
54
59
mint ,
55
60
maxt ,
56
61
)
57
62
if err != nil {
58
- return nil , nil , err
63
+ return nil , nil , nil , err
59
64
}
60
65
queriers [pos ] = q
61
66
}
62
67
63
- return tenantIDs , queriers , nil
68
+ closer := func () {
69
+ for _ , span := range spans {
70
+ span .Finish ()
71
+ }
72
+ }
73
+
74
+ return tenantIDs , queriers , closer , nil
64
75
}
65
76
}
66
77
67
78
// MergeQuerierCallback returns the underlying queriers and their IDs relevant
68
79
// for the query.
69
- type MergeQuerierCallback func (ctx context.Context , mint int64 , maxt int64 ) (ids []string , queriers []storage.Querier , err error )
80
+ type MergeQuerierCallback func (ctx context.Context , mint int64 , maxt int64 ) (ids []string , queriers []storage.Querier , closer func (), err error )
70
81
71
82
// NewMergeQueryable returns a queryable that merges results from multiple
72
83
// underlying Queryables. The underlying queryables and its label values to be
@@ -100,7 +111,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
100
111
// TODO: it's necessary to think how to override context inside querier
101
112
// to mark spans created inside querier as child of a span created inside
102
113
// methods of merged querier.
103
- ids , queriers , err := m .callback (ctx , mint , maxt )
114
+ ids , queriers , closer , err := m .callback (ctx , mint , maxt )
104
115
if err != nil {
105
116
return nil , err
106
117
}
@@ -115,6 +126,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
115
126
idLabelName : m .idLabelName ,
116
127
queriers : queriers ,
117
128
ids : ids ,
129
+ closer : closer ,
118
130
}, nil
119
131
}
120
132
@@ -129,6 +141,7 @@ type mergeQuerier struct {
129
141
queriers []storage.Querier
130
142
idLabelName string
131
143
ids []string
144
+ closer func ()
132
145
}
133
146
134
147
// LabelValues returns all potential values for a label name. It is not safe
@@ -262,6 +275,7 @@ func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, st
262
275
263
276
// Close releases the resources of the Querier.
264
277
func (m * mergeQuerier ) Close () error {
278
+ m .closer ()
265
279
errs := tsdb_errors .NewMulti ()
266
280
for pos , id := range m .ids {
267
281
errs .Add (errors .Wrapf (m .queriers [pos ].Close (), "failed to close querier for %s %s" , rewriteLabelName (m .idLabelName ), id ))
0 commit comments