@@ -9,21 +9,18 @@ import (
9
9
"os"
10
10
"path"
11
11
"sync"
12
- "time"
13
12
14
13
"github.com/go-kit/kit/log"
15
14
"github.com/go-kit/kit/log/level"
16
15
"github.com/pkg/errors"
17
16
"github.com/prometheus/client_golang/prometheus"
18
- "github.com/prometheus/common/model"
19
17
"github.com/prometheus/prometheus/pkg/labels"
20
18
"github.com/prometheus/prometheus/storage"
21
19
"github.com/prometheus/prometheus/tsdb"
22
20
terrors "github.com/prometheus/prometheus/tsdb/errors"
23
21
"github.com/thanos-io/thanos/pkg/block/metadata"
24
22
"github.com/thanos-io/thanos/pkg/component"
25
23
"github.com/thanos-io/thanos/pkg/objstore"
26
- "github.com/thanos-io/thanos/pkg/runutil"
27
24
"github.com/thanos-io/thanos/pkg/shipper"
28
25
"github.com/thanos-io/thanos/pkg/store"
29
26
"golang.org/x/sync/errgroup"
@@ -72,8 +69,6 @@ func NewMultiTSDB(
72
69
}
73
70
74
71
type tenant struct {
75
- tsdbOpts * tsdb.Options
76
-
77
72
readyS * ReadyStorage
78
73
tsdb * tsdb.DB
79
74
storeTSDB * store.TSDBStore
@@ -82,11 +77,10 @@ type tenant struct {
82
77
mtx * sync.RWMutex
83
78
}
84
79
85
- func newTenant (tsdbOpts * tsdb. Options ) * tenant {
80
+ func newTenant () * tenant {
86
81
return & tenant {
87
- tsdbOpts : tsdbOpts ,
88
- readyS : & ReadyStorage {},
89
- mtx : & sync.RWMutex {},
82
+ readyS : & ReadyStorage {},
83
+ mtx : & sync.RWMutex {},
90
84
}
91
85
}
92
86
@@ -113,7 +107,7 @@ func (t *tenant) db() *tsdb.DB {
113
107
}
114
108
115
109
func (t * tenant ) set (storeTSDB * store.TSDBStore , tenantTSDB * tsdb.DB , ship * shipper.Shipper ) {
116
- t .readyS .Set (tenantTSDB , int64 ( 2 * time . Duration ( t . tsdbOpts . MinBlockDuration ). Seconds () * 1000 ) )
110
+ t .readyS .Set (tenantTSDB )
117
111
t .mtx .Lock ()
118
112
t .tsdb = tenantTSDB
119
113
t .storeTSDB = storeTSDB
@@ -220,6 +214,47 @@ func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore {
220
214
return res
221
215
}
222
216
217
+ func (t * MultiTSDB ) startTSDB (logger log.Logger , tenantID string , tenant * tenant ) error {
218
+ reg := prometheus .WrapRegistererWith (prometheus.Labels {"tenant" : tenantID }, t .reg )
219
+ lbls := append (t .labels , labels.Label {Name : t .tenantLabelName , Value : tenantID })
220
+ dataDir := path .Join (t .dataDir , tenantID )
221
+
222
+ opts := * t .tsdbOpts
223
+ s , err := tsdb .Open (
224
+ dataDir ,
225
+ logger ,
226
+ & UnRegisterer {Registerer : reg },
227
+ & opts ,
228
+ )
229
+ if err != nil {
230
+ t .mtx .Lock ()
231
+ delete (t .tenants , tenantID )
232
+ t .mtx .Unlock ()
233
+ return err
234
+ }
235
+
236
+ var ship * shipper.Shipper
237
+ if t .bucket != nil {
238
+ ship = shipper .New (
239
+ logger ,
240
+ reg ,
241
+ dataDir ,
242
+ t .bucket ,
243
+ func () labels.Labels { return lbls },
244
+ metadata .ReceiveSource ,
245
+ t .allowOutOfOrderUpload ,
246
+ )
247
+ }
248
+ tenant .set (store .NewTSDBStore (
249
+ logger ,
250
+ reg ,
251
+ s ,
252
+ component .Receive ,
253
+ lbls ,
254
+ ), s , ship )
255
+
256
+ return nil
257
+ }
223
258
func (t * MultiTSDB ) getOrLoadTenant (tenantID string , blockingStart bool ) (* tenant , error ) {
224
259
// Fast path, as creating tenants is a very rare operation.
225
260
t .mtx .RLock ()
@@ -239,68 +274,20 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
239
274
return tenant , nil
240
275
}
241
276
242
- tenant = newTenant (t . tsdbOpts )
277
+ tenant = newTenant ()
243
278
t .tenants [tenantID ] = tenant
244
279
t .mtx .Unlock ()
245
280
246
- var err error
247
- startTSDB := func () {
248
- reg := prometheus .WrapRegistererWith (prometheus.Labels {
249
- "tenant" : tenantID ,
250
- }, t .reg )
251
- logger := log .With (t .logger , "tenant" , tenantID )
252
- lbls := append (t .labels , labels.Label {Name : t .tenantLabelName , Value : tenantID })
253
- dataDir := path .Join (t .dataDir , tenantID )
254
-
255
- var ship * shipper.Shipper
256
- if t .bucket != nil {
257
- ship = shipper .New (
258
- logger ,
259
- reg ,
260
- dataDir ,
261
- t .bucket ,
262
- func () labels.Labels { return lbls },
263
- metadata .ReceiveSource ,
264
- t .allowOutOfOrderUpload ,
265
- )
266
- }
267
-
268
- s , err := tsdb .Open (
269
- dataDir ,
270
- logger ,
271
- & UnRegisterer {Registerer : reg },
272
- t .tsdbOpts ,
273
- )
274
-
275
- // Assign to outer error to report in blocking case.
276
- if err != nil {
277
- level .Error (logger ).Log ("msg" , "failed to open tsdb" , "err" , err )
278
- t .mtx .Lock ()
279
- delete (t .tenants , tenantID )
280
- t .mtx .Unlock ()
281
- runutil .CloseWithLogOnErr (logger , s , "failed to close tsdb" )
282
- return
283
- }
284
-
285
- tenant .set (
286
- store .NewTSDBStore (
287
- logger ,
288
- reg ,
289
- s ,
290
- component .Receive ,
291
- lbls ,
292
- ),
293
- s ,
294
- ship ,
295
- )
296
- }
281
+ logger := log .With (t .logger , "tenant" , tenantID )
297
282
if ! blockingStart {
298
- go startTSDB ()
283
+ go func () {
284
+ if err := t .startTSDB (logger , tenantID , tenant ); err != nil {
285
+ level .Error (logger ).Log ("msg" , "failed to start tsdb asynchronously" , "err" , err )
286
+ }
287
+ }()
299
288
return tenant , nil
300
289
}
301
-
302
- startTSDB ()
303
- return tenant , err
290
+ return tenant , t .startTSDB (logger , tenantID , tenant )
304
291
}
305
292
306
293
func (t * MultiTSDB ) TenantAppendable (tenantID string ) (Appendable , error ) {
@@ -323,11 +310,11 @@ type ReadyStorage struct {
323
310
}
324
311
325
312
// Set the storage.
326
- func (s * ReadyStorage ) Set (db * tsdb.DB , startTimeMargin int64 ) {
313
+ func (s * ReadyStorage ) Set (db * tsdb.DB ) {
327
314
s .mtx .Lock ()
328
315
defer s .mtx .Unlock ()
329
316
330
- s .a = & adapter {db : db , startTimeMargin : startTimeMargin }
317
+ s .a = & adapter {db : db }
331
318
}
332
319
333
320
// Get the storage.
@@ -347,10 +334,7 @@ func (s *ReadyStorage) get() *adapter {
347
334
348
335
// StartTime implements the Storage interface.
349
336
func (s * ReadyStorage ) StartTime () (int64 , error ) {
350
- if x := s .get (); x != nil {
351
- return x .StartTime ()
352
- }
353
- return int64 (model .Latest ), ErrNotReady
337
+ return 0 , errors .New ("not implemented" )
354
338
}
355
339
356
340
// Querier implements the Storage interface.
@@ -379,22 +363,12 @@ func (s *ReadyStorage) Close() error {
379
363
380
364
// adapter implements a storage.Storage around TSDB.
381
365
type adapter struct {
382
- db * tsdb.DB
383
- startTimeMargin int64
366
+ db * tsdb.DB
384
367
}
385
368
386
369
// StartTime implements the Storage interface.
387
370
func (a adapter ) StartTime () (int64 , error ) {
388
- var startTime int64
389
-
390
- if len (a .db .Blocks ()) > 0 {
391
- startTime = a .db .Blocks ()[0 ].Meta ().MinTime
392
- } else {
393
- startTime = time .Now ().Unix () * 1000
394
- }
395
-
396
- // Add a safety margin as it may take a few minutes for everything to spin up.
397
- return startTime + a .startTimeMargin , nil
371
+ return 0 , errors .New ("not implemented" )
398
372
}
399
373
400
374
func (a adapter ) Querier (ctx context.Context , mint , maxt int64 ) (storage.Querier , error ) {
0 commit comments