Skip to content

Commit 4d751f2

Browse files
authored
Introduce SkipBlocksWithOutOfOrderChunksEnabled feature (#4707)
* Introduce SkipBlocksWithOutOfOrderChunksEnabled feature Signed-off-by: Alvin Lin <[email protected]> * Update change log with PR numbers Signed-off-by: Alvin Lin <[email protected]> * Add some comments Signed-off-by: Alvin Lin <[email protected]> * Format test data Signed-off-by: Alvin Lin <[email protected]> * Fix lint error Signed-off-by: Alvin Lin <[email protected]> * Fix flaky tests Signed-off-by: Alvin Lin <[email protected]> * Fix CHANGELOG conflict Signed-off-by: Alvin Lin <[email protected]> * Undo my editor's massive change to CHANE log Signed-off-by: Alvin Lin <[email protected]>
1 parent 738ecf6 commit 4d751f2

File tree

7 files changed

+317
-45
lines changed

7 files changed

+317
-45
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
## master / unreleased
44

5-
* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels.
65
* [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686
76
* [CHANGE] Enable Thanos series limiter in store-gateway. #4702
87
* [CHANGE] Distributor: Apply `max_fetched_series_per_query` limit for `/series` API. #4683
8+
* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels.
9+
* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction
910

1011
## 1.12.0 in progress
1112

docs/blocks-storage/compactor.md

+5
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ compactor:
147147
# CLI flag: -compactor.tenant-cleanup-delay
148148
[tenant_cleanup_delay: <duration> | default = 6h]
149149

150+
# When enabled, mark blocks containing index with out-of-order chunks for no
151+
# compact instead of halting the compaction.
152+
# CLI flag: -compactor.skip-blocks-with-out-of-order-chunks-enabled
153+
[skip_blocks_with_out_of_order_chunks_enabled: <boolean> | default = false]
154+
150155
# When enabled, at compactor startup the bucket will be scanned and all found
151156
# deletion marks inside the block location will be copied to the markers
152157
# global location too. This option can (and should) be safely disabled as soon

docs/configuration/config-file-reference.md

+5
Original file line numberDiff line numberDiff line change
@@ -5294,6 +5294,11 @@ The `compactor_config` configures the compactor for the blocks storage.
52945294
# CLI flag: -compactor.tenant-cleanup-delay
52955295
[tenant_cleanup_delay: <duration> | default = 6h]
52965296
5297+
# When enabled, mark blocks containing index with out-of-order chunks for no
5298+
# compact instead of halting the compaction.
5299+
# CLI flag: -compactor.skip-blocks-with-out-of-order-chunks-enabled
5300+
[skip_blocks_with_out_of_order_chunks_enabled: <boolean> | default = false]
5301+
52975302
# When enabled, at compactor startup the bucket will be scanned and all found
52985303
# deletion marks inside the block location will be copied to the markers global
52995304
# location too. This option can (and should) be safely disabled as soon as the

pkg/compactor/compactor.go

+55-29
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ var (
5151
supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
5252
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
5353

54-
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
54+
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
5555
return compact.NewDefaultGrouper(
5656
logger,
5757
bkt,
@@ -60,42 +60,48 @@ var (
6060
reg,
6161
blocksMarkedForDeletion,
6262
garbageCollectedBlocks,
63-
prometheus.NewCounter(prometheus.CounterOpts{}),
63+
blocksMarkedForNoCompaction,
6464
metadata.NoneFunc)
6565
}
6666

67-
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
67+
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
6868
return NewShuffleShardingGrouper(
6969
logger,
7070
bkt,
7171
false, // Do not accept malformed indexes
7272
true, // Enable vertical compaction
7373
reg,
7474
blocksMarkedForDeletion,
75-
prometheus.NewCounter(prometheus.CounterOpts{}),
75+
blocksMarkedForNoCompaction,
7676
garbageCollectedBlocks,
7777
metadata.NoneFunc,
7878
cfg)
7979
}
8080

81-
DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) {
81+
DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
8282
compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil)
8383
if err != nil {
8484
return nil, nil, err
8585
}
8686

87-
planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds())
88-
return compactor, planner, nil
87+
plannerFactory := func(logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner {
88+
return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter)
89+
}
90+
91+
return compactor, plannerFactory, nil
8992
}
9093

91-
ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) {
94+
ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
9295
compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil)
9396
if err != nil {
9497
return nil, nil, err
9598
}
9699

97-
planner := NewShuffleShardingPlanner(logger, cfg.BlockRanges.ToMilliseconds())
98-
return compactor, planner, nil
100+
plannerFactory := func(logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner {
101+
102+
return NewShuffleShardingPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks)
103+
}
104+
return compactor, plannerFactory, nil
99105
}
100106
)
101107

@@ -107,6 +113,7 @@ type BlocksGrouperFactory func(
107113
logger log.Logger,
108114
reg prometheus.Registerer,
109115
blocksMarkedForDeletion prometheus.Counter,
116+
blocksMarkedForNoCompact prometheus.Counter,
110117
garbageCollectedBlocks prometheus.Counter,
111118
) compact.Grouper
112119

@@ -116,22 +123,29 @@ type BlocksCompactorFactory func(
116123
cfg Config,
117124
logger log.Logger,
118125
reg prometheus.Registerer,
119-
) (compact.Compactor, compact.Planner, error)
126+
) (compact.Compactor, PlannerFactory, error)
127+
128+
type PlannerFactory func(
129+
logger log.Logger,
130+
cfg Config,
131+
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
132+
) compact.Planner
120133

121134
// Config holds the Compactor config.
122135
type Config struct {
123-
BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"`
124-
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
125-
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
126-
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
127-
DataDir string `yaml:"data_dir"`
128-
CompactionInterval time.Duration `yaml:"compaction_interval"`
129-
CompactionRetries int `yaml:"compaction_retries"`
130-
CompactionConcurrency int `yaml:"compaction_concurrency"`
131-
CleanupInterval time.Duration `yaml:"cleanup_interval"`
132-
CleanupConcurrency int `yaml:"cleanup_concurrency"`
133-
DeletionDelay time.Duration `yaml:"deletion_delay"`
134-
TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"`
136+
BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"`
137+
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
138+
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
139+
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
140+
DataDir string `yaml:"data_dir"`
141+
CompactionInterval time.Duration `yaml:"compaction_interval"`
142+
CompactionRetries int `yaml:"compaction_retries"`
143+
CompactionConcurrency int `yaml:"compaction_concurrency"`
144+
CleanupInterval time.Duration `yaml:"cleanup_interval"`
145+
CleanupConcurrency int `yaml:"cleanup_concurrency"`
146+
DeletionDelay time.Duration `yaml:"deletion_delay"`
147+
TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"`
148+
SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"`
135149

136150
// Whether the migration of block deletion marks to the global markers location is enabled.
137151
BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"`
@@ -180,6 +194,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
180194
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
181195
f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.")
182196
f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", false, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.")
197+
f.BoolVar(&cfg.SkipBlocksWithOutOfOrderChunksEnabled, "compactor.skip-blocks-with-out-of-order-chunks-enabled", false, "When enabled, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction.")
183198

184199
f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
185200
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")
@@ -231,9 +246,10 @@ type Compactor struct {
231246
// Blocks cleaner is responsible to hard delete blocks marked for deletion.
232247
blocksCleaner *BlocksCleaner
233248

234-
// Underlying compactor and planner used to compact TSDB blocks.
249+
// Underlying compactor used to compact TSDB blocks.
235250
blocksCompactor compact.Compactor
236-
blocksPlanner compact.Planner
251+
252+
blocksPlannerFactory PlannerFactory
237253

238254
// Client used to run operations on the bucket storing blocks.
239255
bucketClient objstore.Bucket
@@ -255,6 +271,7 @@ type Compactor struct {
255271
compactionRunFailedTenants prometheus.Gauge
256272
compactionRunInterval prometheus.Gauge
257273
blocksMarkedForDeletion prometheus.Counter
274+
blocksMarkedForNoCompaction prometheus.Counter
258275
garbageCollectedBlocks prometheus.Counter
259276

260277
// TSDB syncer metrics
@@ -357,6 +374,10 @@ func newCompactor(
357374
Help: blocksMarkedForDeletionHelp,
358375
ConstLabels: prometheus.Labels{"reason": "compaction"},
359376
}),
377+
blocksMarkedForNoCompaction: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
378+
Name: "cortex_compactor_blocks_marked_for_no_compaction_total",
379+
Help: "Total number of blocks marked for no compact during a compaction run.",
380+
}),
360381
garbageCollectedBlocks: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
361382
Name: "cortex_compactor_garbage_collected_blocks_total",
362383
Help: "Total number of blocks marked for deletion by compactor.",
@@ -389,7 +410,7 @@ func (c *Compactor) starting(ctx context.Context) error {
389410
}
390411

391412
// Create blocks compactor dependencies.
392-
c.blocksCompactor, c.blocksPlanner, err = c.blocksCompactorFactory(ctx, c.compactorCfg, c.logger, c.registerer)
413+
c.blocksCompactor, c.blocksPlannerFactory, err = c.blocksCompactorFactory(ctx, c.compactorCfg, c.logger, c.registerer)
393414
if err != nil {
394415
return errors.Wrap(err, "failed to initialize compactor dependencies")
395416
}
@@ -657,6 +678,10 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
657678
0,
658679
c.compactorCfg.MetaSyncConcurrency)
659680

681+
// Filters out blocks with no compaction maker; blocks can be marked as no compaction for reasons like
682+
// out of order chunks or index file too big.
683+
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency)
684+
660685
fetcher, err := block.NewMetaFetcher(
661686
ulogger,
662687
c.compactorCfg.MetaSyncConcurrency,
@@ -671,6 +696,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
671696
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
672697
ignoreDeletionMarkFilter,
673698
deduplicateBlocksFilter,
699+
noCompactMarkerFilter,
674700
},
675701
nil,
676702
)
@@ -696,13 +722,13 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
696722
compactor, err := compact.NewBucketCompactor(
697723
ulogger,
698724
syncer,
699-
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks),
700-
c.blocksPlanner,
725+
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks),
726+
c.blocksPlannerFactory(ulogger, c.compactorCfg, noCompactMarkerFilter),
701727
c.blocksCompactor,
702728
path.Join(c.compactorCfg.DataDir, "compact"),
703729
bucket,
704730
c.compactorCfg.CompactionConcurrency,
705-
false,
731+
c.compactorCfg.SkipBlocksWithOutOfOrderChunksEnabled,
706732
)
707733
if err != nil {
708734
return errors.Wrap(err, "failed to create bucket compactor")

0 commit comments

Comments
 (0)