Skip to content

Commit bdc6443

Browse files
committed
Add posibility to delete blocks older than max time
Signed-off-by: Kevin Hellemun <[email protected]>
1 parent b3651dd commit bdc6443

File tree

7 files changed

+72
-32
lines changed

7 files changed

+72
-32
lines changed

cmd/thanos/tools_bucket.go

+2
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,7 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
444444
Default("0000-01-01T00:00:00Z"))
445445
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
446446
Default("9999-12-31T23:59:59Z"))
447+
deleteOldBlocks := cmd.Flag("delete-old-blocks", "Delete blocks that are older then max-time.").Default("false").Bool()
447448

448449
m[name+" replicate"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
449450
matchers, err := replicate.ParseFlagMatchers(*matcherStrs)
@@ -471,6 +472,7 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
471472
*singleRun,
472473
minTime,
473474
maxTime,
475+
*deleteOldBlocks,
474476
)
475477
}
476478

docs/components/tools.md

+1
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ Flags:
473473
duration relative to current time, such as -1d
474474
or 2h45m. Valid duration units are ms, s, m, h,
475475
d, w, y.
476+
--delete-old-blocks Delete blocks that are older then max-time.
476477
477478
```
478479

pkg/block/block.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,20 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er
123123

124124
// MarkForDeletion creates a file which stores information about when the block was marked for deletion.
125125
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error {
126+
return markForDeletion(ctx, logger, bkt, id, time.Now(), markedForDeletion)
127+
}
128+
129+
// MarkForFutureDeletion creates a file which stores information about when the block should be deleted in the future.
130+
func MarkForFutureDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, futureDeletionTime time.Time, markedForDeletion prometheus.Counter) error {
131+
if time.Now().Before(futureDeletionTime) {
132+
return errors.New(fmt.Sprintf("deletion time %s is not in the future", futureDeletionTime.Format(time.RFC3339)))
133+
}
134+
135+
return markForDeletion(ctx, logger, bkt, id, futureDeletionTime, markedForDeletion)
136+
}
137+
138+
// MarkForDeletion creates a file which stores information about when the block should be marked for deletion.
139+
func markForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, deletionTime time.Time, markedForDeletion prometheus.Counter) error {
126140
deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)
127141
deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile)
128142
if err != nil {
@@ -135,7 +149,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket
135149

136150
deletionMark, err := json.Marshal(metadata.DeletionMark{
137151
ID: id,
138-
DeletionTime: time.Now().Unix(),
152+
DeletionTime: deletionTime.Unix(),
139153
Version: metadata.DeletionMarkVersion1,
140154
})
141155
if err != nil {

pkg/block/metadata/deletionmark.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ var ErrorDeletionMarkNotFound = errors.New("deletion-mark.json not found")
3232
// or the deletion-mark.json file is not a valid json file.
3333
var ErrorUnmarshalDeletionMark = errors.New("unmarshal deletion-mark.json")
3434

35-
// DeletionMark stores block id and when block was marked for deletion.
35+
// DeletionMark stores block id| and when block was marked for deletion.
3636
type DeletionMark struct {
3737
// ID of the tsdb block.
3838
ID ulid.ULID `json:"id"`

pkg/replicate/replicator.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func RunReplicate(
8383
toObjStoreConfig *extflag.PathOrContent,
8484
singleRun bool,
8585
maxTime, minTime *thanosmodel.TimeOrDurationValue,
86+
deleteOldBlocks bool,
8687
) error {
8788
logger = log.With(logger, "component", "replicate")
8889

@@ -163,6 +164,14 @@ func RunReplicate(
163164
}, []string{"result"})
164165
replicationRunDuration.WithLabelValues(labelSuccess)
165166
replicationRunDuration.WithLabelValues(labelError)
167+
blocksCleaned := promauto.With(reg).NewCounter(prometheus.CounterOpts{
168+
Name: "thanos_replicate_blocks_cleaned_total",
169+
Help: "Total number of blocks deleted in replicator.",
170+
})
171+
blockCleanupFailures := promauto.With(reg).NewCounter(prometheus.CounterOpts{
172+
Name: "thanos_replicate_block_cleanup_failures_total",
173+
Help: "Failures encountered while deleting blocks in replicator.",
174+
})
166175

167176
fetcher, err := thanosblock.NewMetaFetcher(
168177
logger,
@@ -186,6 +195,8 @@ func RunReplicate(
186195
metrics := newReplicationMetrics(reg)
187196
ctx, cancel := context.WithCancel(context.Background())
188197

198+
blocksCleaner := compact.NewBlocksCleaner(logger, toBkt, thanosblock.NewIgnoreDeletionMarkFilter(logger, toBkt, time.Hour), time.Hour, blocksCleaned, blockCleanupFailures)
199+
189200
replicateFn := func() error {
190201
timestamp := time.Now()
191202
entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0)
@@ -198,10 +209,16 @@ func RunReplicate(
198209
logger := log.With(logger, "replication-run-id", ulid.String())
199210
level.Info(logger).Log("msg", "running replication attempt")
200211

201-
if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil {
212+
if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg, maxTime, deleteOldBlocks).execute(ctx); err != nil {
202213
return errors.Wrap(err, "replication execute")
203214
}
204215

216+
if deleteOldBlocks {
217+
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
218+
return errors.Wrap(err, "failed to delete old blocks")
219+
}
220+
}
221+
205222
return nil
206223
}
207224

pkg/replicate/scheme.go

+34-20
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import (
1111
"io/ioutil"
1212
"path"
1313
"sort"
14+
"time"
15+
16+
"github.com/thanos-io/thanos/pkg/model"
1417

1518
"github.com/go-kit/kit/log"
1619
"github.com/go-kit/kit/log/level"
@@ -114,6 +117,9 @@ type replicationScheme struct {
114117
metrics *replicationMetrics
115118

116119
reg prometheus.Registerer
120+
121+
maxTime *model.TimeOrDurationValue
122+
markBlocksForFutureDeletion bool
117123
}
118124

119125
type replicationMetrics struct {
@@ -124,6 +130,8 @@ type replicationMetrics struct {
124130
blocksAlreadyReplicated prometheus.Counter
125131
blocksReplicated prometheus.Counter
126132
objectsReplicated prometheus.Counter
133+
134+
blocksMarkedForDeletion prometheus.Counter
127135
}
128136

129137
func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
@@ -152,31 +160,29 @@ func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
152160
Name: "thanos_replicate_objects_replicated_total",
153161
Help: "Total number of objects replicated.",
154162
}),
163+
blocksMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{
164+
Name: "thanos_replicate _blocks_marked_for_deletion_total",
165+
Help: "Total number of blocks marked for deletion in compactor.",
166+
}),
155167
}
156168
return m
157169
}
158170

159-
func newReplicationScheme(
160-
logger log.Logger,
161-
metrics *replicationMetrics,
162-
blockFilter blockFilterFunc,
163-
fetcher thanosblock.MetadataFetcher,
164-
from objstore.InstrumentedBucketReader,
165-
to objstore.Bucket,
166-
reg prometheus.Registerer,
167-
) *replicationScheme {
171+
func newReplicationScheme(logger log.Logger, metrics *replicationMetrics, blockFilter blockFilterFunc, fetcher thanosblock.MetadataFetcher, from objstore.InstrumentedBucketReader, to objstore.Bucket, reg prometheus.Registerer, maxTime *model.TimeOrDurationValue, markFoFutureDeletion bool) *replicationScheme {
168172
if logger == nil {
169173
logger = log.NewNopLogger()
170174
}
171175

172176
return &replicationScheme{
173-
logger: logger,
174-
blockFilter: blockFilter,
175-
fetcher: fetcher,
176-
fromBkt: from,
177-
toBkt: to,
178-
metrics: metrics,
179-
reg: reg,
177+
logger: logger,
178+
blockFilter: blockFilter,
179+
fetcher: fetcher,
180+
fromBkt: from,
181+
toBkt: to,
182+
metrics: metrics,
183+
reg: reg,
184+
maxTime: maxTime,
185+
markBlocksForFutureDeletion: markFoFutureDeletion,
180186
}
181187
}
182188

@@ -231,7 +237,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
231237
})
232238

233239
for _, b := range availableBlocks {
234-
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
240+
if err := rs.ensureBlockIsReplicated(ctx, b); err != nil {
235241
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
236242
}
237243
}
@@ -241,8 +247,8 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
241247

242248
// ensureBlockIsReplicated ensures that a block present in the origin bucket is
243249
// present in the target bucket.
244-
func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id ulid.ULID) error {
245-
blockID := id.String()
250+
func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, meta *metadata.Meta) error {
251+
blockID := meta.ULID.String()
246252
chunksDir := path.Join(blockID, thanosblock.ChunksDirname)
247253
indexFile := path.Join(blockID, thanosblock.IndexFilename)
248254
metaFile := path.Join(blockID, thanosblock.MetaFilename)
@@ -281,7 +287,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
281287
// If the origin meta file content and target meta file content is
282288
// equal, we know we have already successfully replicated
283289
// previously.
284-
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", id.String())
290+
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", meta.ULID.String())
285291
rs.metrics.blocksAlreadyReplicated.Inc()
286292

287293
return nil
@@ -309,6 +315,14 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
309315
return errors.Wrap(err, "upload meta file")
310316
}
311317

318+
if rs.markBlocksForFutureDeletion {
319+
deletionTime := time.Unix(meta.MaxTime/1000, 0).Add(time.Duration(*rs.maxTime.Dur))
320+
if err := thanosblock.MarkForFutureDeletion(ctx, rs.logger, rs.toBkt, meta.ULID, deletionTime, nil); err != nil {
321+
return errors.Wrap(err, "failed to mark block for future deletion")
322+
}
323+
rs.metrics.blocksMarkedForDeletion.Inc()
324+
}
325+
312326
rs.metrics.blocksReplicated.Inc()
313327

314328
return nil

pkg/replicate/scheme_test.go

+1-9
Original file line numberDiff line numberDiff line change
@@ -315,15 +315,7 @@ func TestReplicationSchemeAll(t *testing.T) {
315315
fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil)
316316
testutil.Ok(t, err)
317317

318-
r := newReplicationScheme(
319-
logger,
320-
newReplicationMetrics(nil),
321-
filter,
322-
fetcher,
323-
objstore.WithNoopInstr(originBucket),
324-
targetBucket,
325-
nil,
326-
)
318+
r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil, nil, false)
327319

328320
err = r.execute(ctx)
329321
testutil.Ok(t, err)

0 commit comments

Comments
 (0)