Skip to content

Commit 2be2db7

Browse files
authored
Fix -compact.skip-block-with-out-of-order-chunks (#4566)
Signed-off-by: Marco Pracucci <[email protected]>
1 parent 79d9480 commit 2be2db7

File tree

3 files changed

+79
-81
lines changed

3 files changed

+79
-81
lines changed

cmd/thanos/compact.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,6 @@ func runCompact(
352352
compactMetrics.garbageCollectedBlocks,
353353
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
354354
metadata.HashFunc(conf.hashFunc),
355-
conf.skipBlockWithOutOfOrderChunks,
356355
)
357356
planner := compact.WithLargeTotalIndexSizeFilter(
358357
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
@@ -370,6 +369,7 @@ func runCompact(
370369
compactDir,
371370
bkt,
372371
conf.compactionConcurrency,
372+
conf.skipBlockWithOutOfOrderChunks,
373373
)
374374
if err != nil {
375375
return errors.Wrap(err, "create bucket compactor")

pkg/compact/compact.go

+75-77
Original file line numberDiff line numberDiff line change
@@ -229,20 +229,19 @@ func defaultGroupKey(res int64, lbls labels.Labels) string {
229229
// DefaultGrouper is the Thanos built-in grouper. It groups blocks based on downsample
230230
// resolution and block's labels.
231231
type DefaultGrouper struct {
232-
bkt objstore.Bucket
233-
logger log.Logger
234-
acceptMalformedIndex bool
235-
enableVerticalCompaction bool
236-
compactions *prometheus.CounterVec
237-
compactionRunsStarted *prometheus.CounterVec
238-
compactionRunsCompleted *prometheus.CounterVec
239-
compactionFailures *prometheus.CounterVec
240-
verticalCompactions *prometheus.CounterVec
241-
garbageCollectedBlocks prometheus.Counter
242-
blocksMarkedForDeletion prometheus.Counter
243-
blocksMarkedForNoCompact prometheus.Counter
244-
hashFunc metadata.HashFunc
245-
skipChunksWithOutOfOrderBlocks bool
232+
bkt objstore.Bucket
233+
logger log.Logger
234+
acceptMalformedIndex bool
235+
enableVerticalCompaction bool
236+
compactions *prometheus.CounterVec
237+
compactionRunsStarted *prometheus.CounterVec
238+
compactionRunsCompleted *prometheus.CounterVec
239+
compactionFailures *prometheus.CounterVec
240+
verticalCompactions *prometheus.CounterVec
241+
garbageCollectedBlocks prometheus.Counter
242+
blocksMarkedForDeletion prometheus.Counter
243+
blocksMarkedForNoCompact prometheus.Counter
244+
hashFunc metadata.HashFunc
246245
}
247246

248247
// NewDefaultGrouper makes a new DefaultGrouper.
@@ -256,7 +255,6 @@ func NewDefaultGrouper(
256255
garbageCollectedBlocks prometheus.Counter,
257256
blocksMarkedForNoCompact prometheus.Counter,
258257
hashFunc metadata.HashFunc,
259-
skipChunksWithOutOfOrderBlocks bool,
260258
) *DefaultGrouper {
261259
return &DefaultGrouper{
262260
bkt: bkt,
@@ -283,11 +281,10 @@ func NewDefaultGrouper(
283281
Name: "thanos_compact_group_vertical_compactions_total",
284282
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
285283
}, []string{"group"}),
286-
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
287-
garbageCollectedBlocks: garbageCollectedBlocks,
288-
blocksMarkedForDeletion: blocksMarkedForDeletion,
289-
hashFunc: hashFunc,
290-
skipChunksWithOutOfOrderBlocks: skipChunksWithOutOfOrderBlocks,
284+
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
285+
garbageCollectedBlocks: garbageCollectedBlocks,
286+
blocksMarkedForDeletion: blocksMarkedForDeletion,
287+
hashFunc: hashFunc,
291288
}
292289
}
293290

@@ -317,7 +314,6 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
317314
g.blocksMarkedForDeletion,
318315
g.blocksMarkedForNoCompact,
319316
g.hashFunc,
320-
g.skipChunksWithOutOfOrderBlocks,
321317
)
322318
if err != nil {
323319
return nil, errors.Wrap(err, "create compaction group")
@@ -338,25 +334,24 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
338334
// Group captures a set of blocks that have the same origin labels and downsampling resolution.
339335
// Those blocks generally contain the same series and can thus efficiently be compacted.
340336
type Group struct {
341-
logger log.Logger
342-
bkt objstore.Bucket
343-
key string
344-
labels labels.Labels
345-
resolution int64
346-
mtx sync.Mutex
347-
metasByMinTime []*metadata.Meta
348-
acceptMalformedIndex bool
349-
enableVerticalCompaction bool
350-
compactions prometheus.Counter
351-
compactionRunsStarted prometheus.Counter
352-
compactionRunsCompleted prometheus.Counter
353-
compactionFailures prometheus.Counter
354-
verticalCompactions prometheus.Counter
355-
groupGarbageCollectedBlocks prometheus.Counter
356-
blocksMarkedForDeletion prometheus.Counter
357-
blocksMarkedForNoCompact prometheus.Counter
358-
hashFunc metadata.HashFunc
359-
skipChunksWithOutofOrderBlocks bool
337+
logger log.Logger
338+
bkt objstore.Bucket
339+
key string
340+
labels labels.Labels
341+
resolution int64
342+
mtx sync.Mutex
343+
metasByMinTime []*metadata.Meta
344+
acceptMalformedIndex bool
345+
enableVerticalCompaction bool
346+
compactions prometheus.Counter
347+
compactionRunsStarted prometheus.Counter
348+
compactionRunsCompleted prometheus.Counter
349+
compactionFailures prometheus.Counter
350+
verticalCompactions prometheus.Counter
351+
groupGarbageCollectedBlocks prometheus.Counter
352+
blocksMarkedForDeletion prometheus.Counter
353+
blocksMarkedForNoCompact prometheus.Counter
354+
hashFunc metadata.HashFunc
360355
}
361356

362357
// NewGroup returns a new compaction group.
@@ -377,29 +372,27 @@ func NewGroup(
377372
blocksMarkedForDeletion prometheus.Counter,
378373
blockMakredForNoCopmact prometheus.Counter,
379374
hashFunc metadata.HashFunc,
380-
skipChunksWithOutOfOrderChunks bool,
381375
) (*Group, error) {
382376
if logger == nil {
383377
logger = log.NewNopLogger()
384378
}
385379
g := &Group{
386-
logger: logger,
387-
bkt: bkt,
388-
key: key,
389-
labels: lset,
390-
resolution: resolution,
391-
acceptMalformedIndex: acceptMalformedIndex,
392-
enableVerticalCompaction: enableVerticalCompaction,
393-
compactions: compactions,
394-
compactionRunsStarted: compactionRunsStarted,
395-
compactionRunsCompleted: compactionRunsCompleted,
396-
compactionFailures: compactionFailures,
397-
verticalCompactions: verticalCompactions,
398-
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
399-
blocksMarkedForDeletion: blocksMarkedForDeletion,
400-
blocksMarkedForNoCompact: blockMakredForNoCopmact,
401-
hashFunc: hashFunc,
402-
skipChunksWithOutofOrderBlocks: skipChunksWithOutOfOrderChunks,
380+
logger: logger,
381+
bkt: bkt,
382+
key: key,
383+
labels: lset,
384+
resolution: resolution,
385+
acceptMalformedIndex: acceptMalformedIndex,
386+
enableVerticalCompaction: enableVerticalCompaction,
387+
compactions: compactions,
388+
compactionRunsStarted: compactionRunsStarted,
389+
compactionRunsCompleted: compactionRunsCompleted,
390+
compactionFailures: compactionFailures,
391+
verticalCompactions: verticalCompactions,
392+
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
393+
blocksMarkedForDeletion: blocksMarkedForDeletion,
394+
blocksMarkedForNoCompact: blockMakredForNoCopmact,
395+
hashFunc: hashFunc,
403396
}
404397
return g, nil
405398
}
@@ -783,7 +776,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
783776
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
784777
}
785778

786-
if err := stats.OutOfOrderChunksErr(); cg.skipChunksWithOutofOrderBlocks && err != nil {
779+
if err := stats.OutOfOrderChunksErr(); err != nil {
787780
return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
788781
}
789782

@@ -890,14 +883,15 @@ func (cg *Group) deleteBlock(id ulid.ULID, bdir string) error {
890883

891884
// BucketCompactor compacts blocks in a bucket.
892885
type BucketCompactor struct {
893-
logger log.Logger
894-
sy *Syncer
895-
grouper Grouper
896-
comp Compactor
897-
planner Planner
898-
compactDir string
899-
bkt objstore.Bucket
900-
concurrency int
886+
logger log.Logger
887+
sy *Syncer
888+
grouper Grouper
889+
comp Compactor
890+
planner Planner
891+
compactDir string
892+
bkt objstore.Bucket
893+
concurrency int
894+
skipBlocksWithOutOfOrderChunks bool
901895
}
902896

903897
// NewBucketCompactor creates a new bucket compactor.
@@ -910,19 +904,21 @@ func NewBucketCompactor(
910904
compactDir string,
911905
bkt objstore.Bucket,
912906
concurrency int,
907+
skipBlocksWithOutOfOrderChunks bool,
913908
) (*BucketCompactor, error) {
914909
if concurrency <= 0 {
915910
return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency)
916911
}
917912
return &BucketCompactor{
918-
logger: logger,
919-
sy: sy,
920-
grouper: grouper,
921-
planner: planner,
922-
comp: comp,
923-
compactDir: compactDir,
924-
bkt: bkt,
925-
concurrency: concurrency,
913+
logger: logger,
914+
sy: sy,
915+
grouper: grouper,
916+
planner: planner,
917+
comp: comp,
918+
compactDir: compactDir,
919+
bkt: bkt,
920+
concurrency: concurrency,
921+
skipBlocksWithOutOfOrderChunks: skipBlocksWithOutOfOrderChunks,
926922
}, nil
927923
}
928924

@@ -977,8 +973,10 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
977973
continue
978974
}
979975
}
980-
// if block has out of order chunk, mark the block for no compaction and continue.
981-
if IsOutOfOrderChunkError(err) {
976+
// If block has out of order chunk and it has been configured to skip it,
977+
// then we can mark the block for no compaction so that the next compaction run
978+
// will skip it.
979+
if IsOutOfOrderChunkError(err) && c.skipBlocksWithOutOfOrderChunks {
982980
if err := block.MarkForNoCompact(
983981
ctx,
984982
c.logger,

pkg/compact/compact_e2e_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
139139
testutil.Ok(t, sy.GarbageCollect(ctx))
140140

141141
// Only the level 3 block, the last source block in both resolutions should be left.
142-
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, true)
142+
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc)
143143
groups, err := grouper.Groups(sy.Metas())
144144
testutil.Ok(t, err)
145145

@@ -214,8 +214,8 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
214214
testutil.Ok(t, err)
215215

216216
planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter)
217-
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, true)
218-
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2)
217+
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc)
218+
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true)
219219
testutil.Ok(t, err)
220220

221221
// Compaction on empty should not fail.

0 commit comments

Comments
 (0)