Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shuffle-sharding for the compactor #4433

Merged
merged 17 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels.
* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction
* [FEATURE] Querier/Query-Frontend: Add `-querier.per-step-stats-enabled` and `-frontend.cache-queryable-samples-stats` configurations to enable query sample statistics
* [FEATURE] Add shuffle sharding for the compactor #4433

## 1.12.0 in progress

Expand All @@ -16,7 +17,6 @@
* [CHANGE] Compactor block deletion mark migration, needed when upgrading from v1.7, is now disabled by default. #4597
* [CHANGE] The `status_code` label on gRPC client metrics has changed from '200' and '500' to '2xx', '5xx', '4xx', 'cancel' or 'error'. 4601
* [CHANGE] Memberlist: changed probe interval from `1s` to `5s` and probe timeout from `500ms` to `2s`. #4601
* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4624
* [ENHANCEMENT] Update Go version to 1.17.8. #4602 #4604 #4658
* [ENHANCEMENT] Keep track of discarded samples due to bad relabel configuration in `cortex_discarded_samples_total`. #4503
* [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4267,6 +4267,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -compactor.blocks-retention-period
[compactor_blocks_retention_period: <duration> | default = 0s]

# The default tenant's shard size when the shuffle-sharding strategy is used by
# the compactor. When this setting is specified in the per-tenant overrides, a
# value of 0 disables shuffle sharding for the tenant.
# CLI flag: -compactor.tenant-shard-size
[compactor_tenant_shard_size: <int> | default = 0]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
13 changes: 13 additions & 0 deletions docs/guides/shuffle-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Cortex currently supports shuffle sharding in the following services:
- [Query-frontend / Query-scheduler](#query-frontend-and-query-scheduler-shuffle-sharding)
- [Store-gateway](#store-gateway-shuffle-sharding)
- [Ruler](#ruler-shuffle-sharding)
- [Compactor](#compactor-shuffle-sharding)

Shuffle sharding is **disabled by default** and needs to be explicitly enabled in the configuration.

Expand Down Expand Up @@ -154,6 +155,18 @@ Cortex ruler can run in three modes:

Note that when using sharding strategy, each rule group is evaluated by single ruler only, there is no replication.

### Compactor shuffle sharding

Cortex compactor can run in three modes:

1. **No sharding at all.** This is the most basic mode of the compactor. It is activated by using `-compactor.sharding-enabled=false` (default). In this mode every compactor will run every compaction.
2. **Default sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=default` (default). In this mode compactors register themselves into the ring. One single tenant will belong to only 1 compactor.
3. **Shuffle sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=shuffle-sharding`. Similarly to default sharding, but compactions for each tenant can be carried out on multiple compactors (`-compactor.tenant-shard-size`, can also be set per tenant as `compactor_tenant_shard_size` in overrides).

With shuffle sharding selected as the sharding strategy, a subset of the compactors will be used to handle a user based on the shard size.

The idea behind using the shuffle sharding strategy for the compactor is to further enable horizontal scalability and build tolerance for compactions that may take longer than the compaction interval.

## FAQ

### Does shuffle sharding add additional overhead to the KV store?
Expand Down
78 changes: 67 additions & 11 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/flagext"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
Expand All @@ -50,8 +51,9 @@ var (

supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
return compact.NewDefaultGrouper(
logger,
bkt,
Expand All @@ -64,7 +66,7 @@ var (
metadata.NoneFunc)
}

ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
return NewShuffleShardingGrouper(
logger,
bkt,
Expand All @@ -74,8 +76,13 @@ var (
blocksMarkedForDeletion,
blocksMarkedForNoCompaction,
garbageCollectedBlocks,
remainingPlannedCompactions,
metadata.NoneFunc,
cfg)
cfg,
ring,
ringLifecycle.Addr,
limits,
userID)
}

DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
Expand Down Expand Up @@ -115,6 +122,11 @@ type BlocksGrouperFactory func(
blocksMarkedForDeletion prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
remainingPlannedCompactions prometheus.Gauge,
ring *ring.Ring,
ringLifecycler *ring.Lifecycler,
limit Limits,
userID string,
) compact.Grouper

// BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
Expand All @@ -131,6 +143,11 @@ type PlannerFactory func(
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
) compact.Planner

// Limits defines limits used by the Compactor.
type Limits interface {
CompactorTenantShardSize(userID string) int
}

// Config holds the Compactor config.
type Config struct {
BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"`
Expand Down Expand Up @@ -200,7 +217,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")
}

func (cfg *Config) Validate() error {
func (cfg *Config) Validate(limits validation.Limits) error {
// Each block range period should be divisible by the previous one.
for i := 1; i < len(cfg.BlockRanges); i++ {
if cfg.BlockRanges[i]%cfg.BlockRanges[i-1] != 0 {
Expand All @@ -213,6 +230,12 @@ func (cfg *Config) Validate() error {
return errInvalidShardingStrategy
}

if cfg.ShardingEnabled && cfg.ShardingStrategy == util.ShardingStrategyShuffle {
if limits.CompactorTenantShardSize <= 0 {
return errInvalidTenantShardSize
}
}

return nil
}

Expand All @@ -233,6 +256,7 @@ type Compactor struct {
parentLogger log.Logger
registerer prometheus.Registerer
allowedTenants *util.AllowedTenants
limits Limits

// Functions that creates bucket client, grouper, planner and compactor using the context.
// Useful for injecting mock objects from tests.
Expand Down Expand Up @@ -273,13 +297,14 @@ type Compactor struct {
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompaction prometheus.Counter
garbageCollectedBlocks prometheus.Counter
remainingPlannedCompactions prometheus.Gauge

// TSDB syncer metrics
syncerMetrics *syncerMetrics
}

// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) {
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer, limits Limits) (*Compactor, error) {
bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) {
return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
}
Expand All @@ -302,7 +327,7 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi
}
}

cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory)
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits)
if err != nil {
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
}
Expand All @@ -319,7 +344,15 @@ func newCompactor(
bucketClientFactory func(ctx context.Context) (objstore.Bucket, error),
blocksGrouperFactory BlocksGrouperFactory,
blocksCompactorFactory BlocksCompactorFactory,
limits Limits,
) (*Compactor, error) {
var remainingPlannedCompactions prometheus.Gauge
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
remainingPlannedCompactions = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Name: "cortex_compactor_remaining_planned_compactions",
Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy",
})
}
c := &Compactor{
compactorCfg: compactorCfg,
storageCfg: storageCfg,
Expand Down Expand Up @@ -382,6 +415,8 @@ func newCompactor(
Name: "cortex_compactor_garbage_collected_blocks_total",
Help: "Total number of blocks marked for deletion by compactor.",
}),
remainingPlannedCompactions: remainingPlannedCompactions,
limits: limits,
}

if len(compactorCfg.EnabledTenants) > 0 {
Expand Down Expand Up @@ -419,7 +454,7 @@ func (c *Compactor) starting(ctx context.Context) error {
c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient)

// Create the users scanner.
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUser, c.parentLogger)
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger)

// Create the blocks cleaner (service).
c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{
Expand Down Expand Up @@ -573,7 +608,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
}

// Ensure the user ID belongs to our shard.
if owned, err := c.ownUser(userID); err != nil {
if owned, err := c.ownUserForCompaction(userID); err != nil {
c.compactionRunSkippedTenants.Inc()
level.Warn(c.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err)
continue
Expand Down Expand Up @@ -722,7 +757,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
compactor, err := compact.NewBucketCompactor(
ulogger,
syncer,
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks),
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.ring, c.ringLifecycler, c.limits, userID),
c.blocksPlannerFactory(ulogger, c.compactorCfg, noCompactMarkerFilter),
c.blocksCompactor,
path.Join(c.compactorCfg.DataDir, "compact"),
Expand Down Expand Up @@ -775,16 +810,37 @@ func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) {
return users, err
}

func (c *Compactor) ownUser(userID string) (bool, error) {
func (c *Compactor) ownUserForCompaction(userID string) (bool, error) {
return c.ownUser(userID, false)
}

func (c *Compactor) ownUserForCleanUp(userID string) (bool, error) {
return c.ownUser(userID, true)
}

func (c *Compactor) ownUser(userID string, isCleanUp bool) (bool, error) {
if !c.allowedTenants.IsAllowed(userID) {
return false, nil
}

// Always owned if sharding is disabled.
// Always owned if sharding is disabled
if !c.compactorCfg.ShardingEnabled {
return true, nil
Copy link
Contributor

@edma2 edma2 Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every Compactor now "owns" all the users, even if it doesn't actually compact any blocks for most users. One particular impact I saw is a huge growth of metadata syncs. Instead, maybe we can compute ownUser based on the shuffle shard of a tenant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick fix that seems to work: b0c2c2a

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @edma2 thanks for taking a look at the compactor PR. We are actively testing this branch in our beta environment, and I'm glad to hear that it is working for you.

We have a similar fix to your commit, and I can confirm that we also saw an improvement in the metadata syncs.

However, on the tenant clean up side, we were running into issues where the deleted tenant directory was left dangling. While compactor-A is deleting the deletion markers, compactor-B is also trying to sync the data, and re-uploads the block index. We currently provide an override for the tenant shard size on the clean-up path so that only 1 compactor owns the cleanup for a given tenant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the tip on the cleanup side. Makes sense that you don't want multiple compactors repeating the same work and potentially conflicting.

}

// If we aren't cleaning up user blocks, and we are using shuffle-sharding, ownership is determined by a subring
// Cleanup should only be owned by a single compactor, as there could be race conditions during block deletion
if !isCleanUp && c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
subRing := c.ring.ShuffleShard(userID, c.limits.CompactorTenantShardSize(userID))

rs, err := subRing.GetAllHealthy(RingOp)
if err != nil {
return false, err
}

return rs.Includes(c.ringLifecycler.Addr), nil
}

// Hash the user ID.
hasher := fnv.New32a()
_, _ = hasher.Write([]byte(userID))
Expand Down
Loading