Skip to content

Commit 2a54885

Browse files
yeya24soniasingla
authored andcommitted
cleanup shipper NewWithCompacted function (thanos-io#2940)
Signed-off-by: Ben Ye <[email protected]> Signed-off-by: soniasingla <[email protected]>
1 parent 3178f1a commit 2a54885

File tree

8 files changed

+18
-49
lines changed

8 files changed

+18
-49
lines changed

cmd/thanos/rule.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ func runRule(
626626
}
627627
}()
628628

629-
s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, allowOutOfOrderUpload)
629+
s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload)
630630

631631
ctx, cancel := context.WithCancel(context.Background())
632632

cmd/thanos/sidecar.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,8 @@ func runSidecar(
273273
return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout)
274274
}
275275

276-
var s *shipper.Shipper
277-
if conf.shipper.uploadCompacted {
278-
s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload)
279-
} else {
280-
s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload)
281-
}
276+
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
277+
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload)
282278

283279
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
284280
if uploaded, err := s.Sync(ctx); err != nil {

docs/components/sidecar.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ Prometheus servers connected to the Thanos cluster via the sidecar are subject t
3232
If you choose to use the sidecar to also upload data to object storage:
3333

3434
* Must specify object storage (`--objstore.*` flags)
35-
* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks-experimental).
35+
* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks).
3636
* The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction on order to use Thanos sidecar upload, otherwise leave local compaction on if sidecar just exposes StoreAPI and your retention is normal. The default of `2h` is recommended.
3737
Mentioned parameters set to equal values disable the internal Prometheus compaction, which is needed to avoid the uploaded data corruption when Thanos compactor does its job, this is critical for data consistency and should not be ignored if you plan to use Thanos compactor. Even though you set mentioned parameters equal, you might observe Prometheus internal metric `prometheus_tsdb_compactions_total` being incremented, don't be confused by that: Prometheus writes initial head block to filesytem via its internal compaction mechanism, but if you have followed recommendations - data won't be modified by Prometheus before the sidecar uploads it. Thanos sidecar will also check sanity of the flags set to Prometheus on the startup and log errors or warning if they have been configured improperly (#838).
3838
* The retention is recommended to not be lower than three times the min block duration, so 6 hours. This achieves resilience in the face of connectivity issues to the object storage since all local data will remain available within the Thanos cluster. If connectivity gets restored the backlog of blocks gets uploaded to the object storage.
@@ -70,7 +70,7 @@ config:
7070
bucket: example-bucket
7171
```
7272
73-
## Upload compacted blocks (EXPERIMENTAL)
73+
## Upload compacted blocks
7474
7575
If you want to migrate from a pure Prometheus setup to Thanos and have to keep the historical data, you can use the flag `--shipper.upload-compacted`. This will also upload blocks that were compacted by Prometheus. Values greater than 1 in the `compaction.level` field of a Prometheus block’s `meta.json` file indicate level of compaction.
7676

pkg/receive/multitsdb.go

+1
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
282282
t.bucket,
283283
func() labels.Labels { return lbls },
284284
metadata.ReceiveSource,
285+
false,
285286
t.allowOutOfOrderUpload,
286287
)
287288
}

pkg/reloader/reloader.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ func (r *Reloader) apply(ctx context.Context) error {
288288
return err
289289
}
290290

291-
// filepath.Walk uses Lstat to retriev os.FileInfo. Lstat does not
291+
// filepath.Walk uses Lstat to retrieve os.FileInfo. Lstat does not
292292
// follow symlinks. Make sure to follow a symlink before checking
293293
// if it is a directory.
294294
targetFile, err := os.Stat(path)

pkg/shipper/shipper.go

+6-34
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,17 @@ type Shipper struct {
8383
allowOutOfOrderUploads bool
8484
}
8585

86-
// New creates a new shipper that detects new TSDB blocks in dir and uploads them
87-
// to remote if necessary. It attaches the Thanos metadata section in each meta JSON file.
86+
// New creates a new shipper that detects new TSDB blocks in dir and uploads them to
87+
// remote if necessary. It attaches the Thanos metadata section in each meta JSON file.
88+
// If uploadCompacted is enabled, it also uploads compacted blocks which are already in filesystem.
8889
func New(
8990
logger log.Logger,
9091
r prometheus.Registerer,
9192
dir string,
9293
bucket objstore.Bucket,
9394
lbls func() labels.Labels,
9495
source metadata.SourceType,
96+
uploadCompacted bool,
9597
allowOutOfOrderUploads bool,
9698
) *Shipper {
9799
if logger == nil {
@@ -106,40 +108,10 @@ func New(
106108
dir: dir,
107109
bucket: bucket,
108110
labels: lbls,
109-
metrics: newMetrics(r, false),
111+
metrics: newMetrics(r, uploadCompacted),
110112
source: source,
111113
allowOutOfOrderUploads: allowOutOfOrderUploads,
112-
}
113-
}
114-
115-
// NewWithCompacted creates a new shipper that detects new TSDB blocks in dir and uploads them
116-
// to remote if necessary, including compacted blocks which are already in filesystem.
117-
// It attaches the Thanos metadata section in each meta JSON file.
118-
func NewWithCompacted(
119-
logger log.Logger,
120-
r prometheus.Registerer,
121-
dir string,
122-
bucket objstore.Bucket,
123-
lbls func() labels.Labels,
124-
source metadata.SourceType,
125-
allowOutOfOrderUploads bool,
126-
) *Shipper {
127-
if logger == nil {
128-
logger = log.NewNopLogger()
129-
}
130-
if lbls == nil {
131-
lbls = func() labels.Labels { return nil }
132-
}
133-
134-
return &Shipper{
135-
logger: logger,
136-
dir: dir,
137-
bucket: bucket,
138-
labels: lbls,
139-
metrics: newMetrics(r, true),
140-
source: source,
141-
uploadCompacted: true,
142-
allowOutOfOrderUploads: allowOutOfOrderUploads,
114+
uploadCompacted: uploadCompacted,
143115
}
144116
}
145117

pkg/shipper/shipper_e2e_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {
4545
}()
4646

4747
extLset := labels.FromStrings("prometheus", "prom-1")
48-
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false)
48+
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false)
4949

5050
ctx, cancel := context.WithCancel(context.Background())
5151
defer cancel()
@@ -219,7 +219,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {
219219
defer upcancel2()
220220
testutil.Ok(t, p.WaitPrometheusUp(upctx2))
221221

222-
shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, false)
222+
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false)
223223

224224
// Create 10 new blocks. 9 of them (non compacted) should be actually uploaded.
225225
var (

pkg/shipper/shipper_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestShipperTimestamps(t *testing.T) {
2626
testutil.Ok(t, os.RemoveAll(dir))
2727
}()
2828

29-
s := New(nil, nil, dir, nil, nil, metadata.TestSource, false)
29+
s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)
3030

3131
// Missing thanos meta file.
3232
_, _, err = s.Timestamps()
@@ -123,7 +123,7 @@ func TestIterBlockMetas(t *testing.T) {
123123
},
124124
}))
125125

126-
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false)
126+
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)
127127
metas, err := shipper.blockMetasFromOldest()
128128
testutil.Ok(t, err)
129129
testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool {
@@ -162,7 +162,7 @@ func BenchmarkIterBlockMetas(b *testing.B) {
162162
})
163163
b.ResetTimer()
164164

165-
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false)
165+
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)
166166

167167
_, err = shipper.blockMetasFromOldest()
168168
testutil.Ok(b, err)

0 commit comments

Comments
 (0)