Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload blocks no compaction marks to the global location and introduce a new cortex_bucket_blocks_marked_for_no_compaction_count metric #4729

Merged
merged 9 commits into from
May 19, 2022
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
* [BUGFIX] Query Frontend: If 'LogQueriesLongerThan' is set to < 0, log all queries as described in the docs. #4633
* [BUGFIX] Distributor: update defaultReplicationStrategy to not fail with extend-write when a single instance is unhealthy. #4636
* [BUGFIX] Distributor: Fix race condition on `/series` introduced by #4683. #4716
* [ENHANCEMENT] Compactor: uploading blocks no compaction marks to the global location and introduce a new metric #4729
* `cortex_bucket_blocks_marked_for_no_compaction_count`: Total number of blocks marked for no compaction in the bucket.

## 1.11.0 2021-11-25

Expand Down
44 changes: 26 additions & 18 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ type BlocksCleaner struct {
lastOwnedUsers []string

// Metrics.
runsStarted prometheus.Counter
runsCompleted prometheus.Counter
runsFailed prometheus.Counter
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
tenantBlocks *prometheus.GaugeVec
tenantMarkedBlocks *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
tenantBucketIndexLastUpdate *prometheus.GaugeVec
runsStarted prometheus.Counter
runsCompleted prometheus.Counter
runsFailed prometheus.Counter
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
tenantBlocks *prometheus.GaugeVec
tenantBlocksMarkedForDelete *prometheus.GaugeVec
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
tenantBucketIndexLastUpdate *prometheus.GaugeVec
}

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
Expand Down Expand Up @@ -102,10 +103,14 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, use
Name: "cortex_bucket_blocks_count",
Help: "Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.",
}, []string{"user"}),
tenantMarkedBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
tenantBlocksMarkedForDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_blocks_marked_for_deletion_count",
Help: "Total number of blocks marked for deletion in the bucket.",
}, []string{"user"}),
tenantBlocksMarkedForNoCompaction: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_blocks_marked_for_no_compaction_count",
Help: "Total number of blocks marked for no compaction in the bucket.",
}, []string{"user"}),
tenantPartialBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_blocks_partials_count",
Help: "Total number of partial blocks.",
Expand Down Expand Up @@ -168,7 +173,8 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context, firstRun bool) error {
for _, userID := range c.lastOwnedUsers {
if !isActive[userID] && !isDeleted[userID] {
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantMarkedBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
}
Expand Down Expand Up @@ -231,15 +237,16 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
// to delete. We also consider them all marked for deletion given the next run will try
// to delete them again.
c.tenantBlocks.WithLabelValues(userID).Set(float64(failed))
c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(failed))
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(failed))
c.tenantPartialBlocks.WithLabelValues(userID).Set(0)

return errors.Errorf("failed to delete %d blocks", failed)
}

// Given all blocks have been deleted, we can also remove the metrics.
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantMarkedBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)

if deletedBlocks > 0 {
Expand Down Expand Up @@ -330,7 +337,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b

// Generate an updated in-memory version of the bucket index.
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
idx, partials, err := w.UpdateIndex(ctx, idx)
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
if err != nil {
return err
}
Expand Down Expand Up @@ -367,9 +374,10 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
}

c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))

return nil
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
user4DebugMetaFile := path.Join("user-4", block.DebugMetas, "meta.json")
require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here")))

// No Compact blocks marker
createTSDBBlock(t, bucketClient, "user-5", 10, 30, nil)
block12 := createTSDBBlock(t, bucketClient, "user-5", 30, 50, nil)
createNoCompactionMark(t, bucketClient, "user-5", block12)

// The fixtures have been created. If the bucket client wasn't wrapped to write
// deletion marks to the global location too, then this is the right time to do it.
if options.markersMigrationEnabled {
Expand Down Expand Up @@ -202,17 +207,26 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
# TYPE cortex_bucket_blocks_count gauge
cortex_bucket_blocks_count{user="user-1"} 2
cortex_bucket_blocks_count{user="user-2"} 1
cortex_bucket_blocks_count{user="user-5"} 2
# HELP cortex_bucket_blocks_marked_for_deletion_count Total number of blocks marked for deletion in the bucket.
# TYPE cortex_bucket_blocks_marked_for_deletion_count gauge
cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 1
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
cortex_bucket_blocks_marked_for_deletion_count{user="user-5"} 0
# HELP cortex_bucket_blocks_marked_for_no_compaction_count Total number of blocks marked for no compaction in the bucket.
# TYPE cortex_bucket_blocks_marked_for_no_compaction_count gauge
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-1"} 0
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-2"} 0
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-5"} 1
# HELP cortex_bucket_blocks_partials_count Total number of partial blocks.
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 2
cortex_bucket_blocks_partials_count{user="user-2"} 0
cortex_bucket_blocks_partials_count{user="user-5"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_marked_for_no_compaction_count",
"cortex_bucket_blocks_partials_count",
))
}
Expand Down Expand Up @@ -421,7 +435,7 @@ func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) {
id3 := createTSDBBlock(t, bucketClient, "user-1", 7000, 8000, nil)

w := bucketindex.NewUpdater(bucketClient, "user-1", nil, logger)
idx, _, err := w.UpdateIndex(ctx, nil)
idx, _, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)

assert.ElementsMatch(t, []ulid.ULID{id1, id2, id3}, idx.Blocks.GetULIDs())
Expand Down
8 changes: 8 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,14 @@ func createDeletionMark(t *testing.T, bkt objstore.Bucket, userID string, blockI
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
}

func createNoCompactionMark(t *testing.T, bkt objstore.Bucket, userID string, blockID ulid.ULID) {
content := mockNoCompactBlockJSON(blockID.String())
blockPath := path.Join(userID, blockID.String())
markPath := path.Join(blockPath, metadata.NoCompactMarkFilename)

require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
}

func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) {
var compactor *Compactor
var log *concurrency.SyncBuffer
Expand Down
61 changes: 46 additions & 15 deletions pkg/storage/tsdb/bucketindex/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ func BlockDeletionMarkFilepath(blockID ulid.ULID) string {
return fmt.Sprintf("%s/%s-%s", MarkersPathname, blockID.String(), metadata.DeletionMarkFilename)
}

// NoCompactMarkFilenameMarkFilepath returns the path, relative to the tenant's bucket location,
// of a block no compact mark in the bucket markers location.
func NoCompactMarkFilenameMarkFilepath(blockID ulid.ULID) string {
return fmt.Sprintf("%s/%s-%s", MarkersPathname, blockID.String(), metadata.NoCompactMarkFilename)
}

// IsBlockDeletionMarkFilename returns whether the input filename matches the expected pattern
// of block deletion markers stored in the markers location.
func IsBlockDeletionMarkFilename(name string) (ulid.ULID, bool) {
Expand All @@ -45,13 +51,36 @@ func IsBlockDeletionMarkFilename(name string) (ulid.ULID, bool) {
return id, err == nil
}

// IsBlockNoCompactMarkFilename returns whether the input filename matches the expected pattern
// of block no compact markers stored in the markers location.
func IsBlockNoCompactMarkFilename(name string) (ulid.ULID, bool) {
parts := strings.SplitN(name, "-", 2)
if len(parts) != 2 {
return ulid.ULID{}, false
}

// Ensure the 2nd part matches the block deletion mark filename.
if parts[1] != metadata.NoCompactMarkFilename {
return ulid.ULID{}, false
}

// Ensure the 1st part is a valid block ID.
id, err := ulid.Parse(filepath.Base(parts[0]))
return id, err == nil
}

// MigrateBlockDeletionMarksToGlobalLocation list all tenant's blocks and, for each of them, look for
// a deletion mark in the block location. Found deletion marks are copied to the global markers location.
// The migration continues on error and returns once all blocks have been checked.
func MigrateBlockDeletionMarksToGlobalLocation(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) error {
bucket := bucket.NewUserBucketClient(userID, bkt, cfgProvider)
userBucket := bucket.WithExpectedErrs(bucket.IsObjNotFoundErr)

marks := map[string]func(ulid.ULID) string{
metadata.DeletionMarkFilename: BlockDeletionMarkFilepath,
metadata.NoCompactMarkFilename: NoCompactMarkFilenameMarkFilepath,
}

// Find all blocks in the storage.
var blocks []ulid.ULID
err := userBucket.Iter(ctx, "", func(name string) error {
Expand All @@ -67,22 +96,24 @@ func MigrateBlockDeletionMarksToGlobalLocation(ctx context.Context, bkt objstore
errs := tsdb_errors.NewMulti()

for _, blockID := range blocks {
// Look up the deletion mark (if any).
reader, err := userBucket.Get(ctx, path.Join(blockID.String(), metadata.DeletionMarkFilename))
if userBucket.IsObjNotFoundErr(err) {
continue
} else if err != nil {
errs.Add(err)
continue
}
for mark, globalFilePath := range marks {
// Look up mark (if any).
reader, err := userBucket.Get(ctx, path.Join(blockID.String(), mark))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who is mark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark can be now both, the deletion and no-compact mark.

if userBucket.IsObjNotFoundErr(err) {
continue
} else if err != nil {
errs.Add(err)
continue
}

// Upload it to the global markers location.
uploadErr := userBucket.Upload(ctx, BlockDeletionMarkFilepath(blockID), reader)
if closeErr := reader.Close(); closeErr != nil {
errs.Add(closeErr)
}
if uploadErr != nil {
errs.Add(uploadErr)
// Upload it to the global markers location.
uploadErr := userBucket.Upload(ctx, globalFilePath(blockID), reader)
if closeErr := reader.Close(); closeErr != nil {
errs.Add(closeErr)
}
if uploadErr != nil {
errs.Add(uploadErr)
}
}
}

Expand Down
31 changes: 21 additions & 10 deletions pkg/storage/tsdb/bucketindex/markers_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func BucketWithGlobalMarkers(b objstore.Bucket) objstore.Bucket {

// Upload implements objstore.Bucket.
func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Reader) error {
blockID, ok := b.isBlockDeletionMark(name)
globalMarkPath, ok := b.isMark(name)
if !ok {
return b.parent.Upload(ctx, name, r)
}
Expand All @@ -46,7 +46,6 @@ func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Read
}

// Upload it to the global markers location too.
globalMarkPath := path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID)))
return b.parent.Upload(ctx, globalMarkPath, bytes.NewBuffer(body))
}

Expand All @@ -58,8 +57,7 @@ func (b *globalMarkersBucket) Delete(ctx context.Context, name string) error {
}

// Delete the marker in the global markers location too.
if blockID, ok := b.isBlockDeletionMark(name); ok {
globalMarkPath := path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID)))
if globalMarkPath, ok := b.isMark(name); ok {
if err := b.parent.Delete(ctx, globalMarkPath); err != nil {
if !b.parent.IsObjNotFoundErr(err) {
return err
Expand Down Expand Up @@ -128,12 +126,25 @@ func (b *globalMarkersBucket) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpe
return b
}

func (b *globalMarkersBucket) isBlockDeletionMark(name string) (ulid.ULID, bool) {
if path.Base(name) != metadata.DeletionMarkFilename {
return ulid.ULID{}, false
func (b *globalMarkersBucket) isMark(name string) (string, bool) {
marks := map[string]func(ulid.ULID) string{
metadata.DeletionMarkFilename: BlockDeletionMarkFilepath,
metadata.NoCompactMarkFilename: NoCompactMarkFilenameMarkFilepath,
}

// Parse the block ID in the path. If there's not block ID, then it's not the per-block
// deletion mark.
return block.IsBlockDir(path.Dir(name))
for mark, globalFilePath := range marks {
if path.Base(name) == mark {
// Parse the block ID in the path. If there's not block ID, then it's not the per-block
// deletion mark.
id, ok := block.IsBlockDir(path.Dir(name))

if ok {
return path.Clean(path.Join(path.Dir(name), "../", globalFilePath(id))), ok
}

return "", ok
}
}

return "", false
}
Loading