Skip to content

Commit b3c1f38

Browse files
committed
Allow 2h block conversion if it has no compact mark
Signed-off-by: SungJin1212 <[email protected]>
1 parent e77aada commit b3c1f38

File tree

8 files changed

+140
-27
lines changed

8 files changed

+140
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
1919
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
2020
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
21+
* [ENHANCEMENT] Parquet Converter: Allow 2h blocks conversion if the block has a no-compact-mark.json file. #6864
2122
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
2223
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
2324
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. Add a `-ingester.return-all-metadata` flag to make the metadata API run when the deployment. Please set this flag to `false` to use the metadata API with the limits later. #6681 #6744

pkg/compactor/blocks_cleaner.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
752752
}
753753
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
754754
}
755-
c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction))
755+
756+
noCompactMarkCheckFunc := func(blockID ulid.ULID) bool {
757+
return cortex_parquet.ExistBlockNoCompact(ctx, userBucket, userLogger, blockID)
758+
}
759+
760+
c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction), noCompactMarkCheckFunc)
756761

757762
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
758763
begin = time.Now()
@@ -762,7 +767,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
762767
return nil
763768
}
764769

765-
func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64) {
770+
func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64, noCompactMarkCheckFunc func(blockID ulid.ULID) bool) {
766771
c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
767772
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
768773
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(totalBlocksBlocksMarkedForNoCompaction)
@@ -772,7 +777,7 @@ func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool,
772777
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
773778
remainingBlocksToConvert := 0
774779
for _, b := range idx.NonParquetBlocks() {
775-
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.BlockRanges) {
780+
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.BlockRanges, noCompactMarkCheckFunc(b.ID)) {
776781
remainingBlocksToConvert++
777782
}
778783
}

pkg/compactor/blocks_cleaner_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1109,8 +1109,12 @@ func TestBlocksCleaner_ParquetMetrics(t *testing.T) {
11091109
},
11101110
}
11111111

1112+
mockNoCompactMarkCheckFunc := func(blockID ulid.ULID) bool {
1113+
return false
1114+
}
1115+
11121116
// Update metrics
1113-
cleaner.updateBucketMetrics("user1", true, idx, 0, 0)
1117+
cleaner.updateBucketMetrics("user1", true, idx, 0, 0, mockNoCompactMarkCheckFunc)
11141118

11151119
// Verify metrics
11161120
require.NoError(t, prom_testutil.CollectAndCompare(cleaner.tenantParquetBlocks, strings.NewReader(`

pkg/parquetconverter/converter.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,8 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
385385
continue
386386
}
387387

388-
if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.blockRanges) {
388+
noCompactMarkExist := cortex_parquet.ExistBlockNoCompact(ctx, uBucket, logger, b.ULID)
389+
if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.blockRanges, noCompactMarkExist) {
389390
continue
390391
}
391392

pkg/parquetconverter/converter_test.go

Lines changed: 81 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212
"github.com/go-kit/log"
1313
"github.com/oklog/ulid/v2"
1414
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/promauto"
1516
"github.com/prometheus/client_golang/prometheus/testutil"
1617
"github.com/prometheus/prometheus/model/labels"
1718
"github.com/prometheus/prometheus/tsdb"
1819
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/mock"
1921
"github.com/stretchr/testify/require"
2022
"github.com/thanos-io/objstore"
2123
"github.com/thanos-io/objstore/providers/filesystem"
@@ -227,6 +229,75 @@ func TestConverter_CleanupMetricsForNotOwnedUser(t *testing.T) {
227229
assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID)))
228230
}
229231

232+
func TestConverter_ShouldConvertNoCompactMarkBlocks(t *testing.T) {
233+
cfg := prepareConfig()
234+
userID := "user"
235+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
236+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
237+
dir := t.TempDir()
238+
239+
cfg.Ring.InstanceID = "parquet-converter-1"
240+
cfg.Ring.InstanceAddr = "1.2.3.4"
241+
cfg.Ring.KVStore.Mock = ringStore
242+
bucketClient, err := filesystem.NewBucket(t.TempDir())
243+
require.NoError(t, err)
244+
userBucket := bucket.NewPrefixedBucketClient(bucketClient, userID)
245+
limits := &validation.Limits{}
246+
flagext.DefaultValues(limits)
247+
limits.ParquetConverterEnabled = true
248+
249+
c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits)
250+
c.ringLifecycler = &ring.Lifecycler{
251+
Addr: "1.2.3.4",
252+
}
253+
254+
ctx := context.Background()
255+
256+
lbls := labels.Labels{labels.Label{
257+
Name: "__name__",
258+
Value: "test",
259+
}}
260+
261+
blocks := []ulid.ULID{}
262+
numBlocks := 2
263+
twoHour := 2 * time.Hour
264+
// Create 2h blocks
265+
for i := 0; i < numBlocks; i++ {
266+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
267+
id, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, twoHour.Milliseconds(), time.Minute.Milliseconds(), 10)
268+
require.NoError(t, err)
269+
blocks = append(blocks, id)
270+
}
271+
272+
for _, bId := range blocks {
273+
blockDir := fmt.Sprintf("%s/%s", dir, bId.String())
274+
b, err := tsdb.OpenBlock(nil, blockDir, nil, nil)
275+
require.NoError(t, err)
276+
// upload block
277+
err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc)
278+
require.NoError(t, err)
279+
280+
// upload no-compact-mark.json
281+
err = block.MarkForNoCompact(ctx, logger, userBucket, bId, metadata.ManualNoCompactReason, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))
282+
require.NoError(t, err)
283+
}
284+
285+
ringMock := &ring.RingMock{}
286+
ringMock.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ring.ReplicationSet{
287+
Instances: []ring.InstanceDesc{
288+
{
289+
Addr: "1.2.3.4",
290+
},
291+
},
292+
}, nil)
293+
294+
err = c.convertUser(ctx, log.NewNopLogger(), ringMock, userID)
295+
require.NoError(t, err)
296+
297+
// Verify the converted metric was incremented
298+
assert.Equal(t, 2.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(userID)))
299+
}
300+
230301
func TestConverter_BlockConversionFailure(t *testing.T) {
231302
// Create a new registry for testing
232303
reg := prometheus.NewRegistry()
@@ -277,7 +348,16 @@ func TestConverter_BlockConversionFailure(t *testing.T) {
277348
Addr: "1.2.3.4",
278349
}
279350

280-
err = converter.convertUser(context.Background(), logger, &RingMock{ReadRing: &ring.Ring{}}, userID)
351+
ringMock := &ring.RingMock{}
352+
ringMock.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ring.ReplicationSet{
353+
Instances: []ring.InstanceDesc{
354+
{
355+
Addr: "1.2.3.4",
356+
},
357+
},
358+
}, nil)
359+
360+
err = converter.convertUser(context.Background(), logger, ringMock, userID)
281361
require.NoError(t, err)
282362

283363
// Verify the failure metric was incremented
@@ -292,17 +372,3 @@ type mockBucket struct {
292372
func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error {
293373
return fmt.Errorf("mock upload failure")
294374
}
295-
296-
type RingMock struct {
297-
ring.ReadRing
298-
}
299-
300-
func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDesc, bufHosts []string, bufZones map[string]int) (ring.ReplicationSet, error) {
301-
return ring.ReplicationSet{
302-
Instances: []ring.InstanceDesc{
303-
{
304-
Addr: "1.2.3.4",
305-
},
306-
},
307-
}, nil
308-
}

pkg/storage/parquet/converter_marker.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99

1010
"github.com/efficientgo/core/errors"
1111
"github.com/go-kit/log"
12+
"github.com/go-kit/log/level"
1213
"github.com/oklog/ulid/v2"
1314
"github.com/thanos-io/objstore"
15+
"github.com/thanos-io/thanos/pkg/block/metadata"
1416
"github.com/thanos-io/thanos/pkg/runutil"
1517

1618
"github.com/cortexproject/cortex/pkg/storage/tsdb"
@@ -26,6 +28,15 @@ type ConverterMark struct {
2628
Version int `json:"version"`
2729
}
2830

31+
func ExistBlockNoCompact(ctx context.Context, userBkt objstore.InstrumentedBucket, logger log.Logger, blockID ulid.ULID) bool {
32+
noCompactMarkerExists, err := userBkt.Exists(ctx, path.Join(blockID.String(), metadata.NoCompactMarkFilename))
33+
if err != nil {
34+
level.Warn(logger).Log("msg", "unable to get stats of no-compact-mark.json for block", "block", blockID.String())
35+
return false
36+
}
37+
return noCompactMarkerExists
38+
}
39+
2940
func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.InstrumentedBucket, logger log.Logger) (*ConverterMark, error) {
3041
markerPath := path.Join(id.String(), ConverterMarkerFileName)
3142
reader, err := userBkt.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(userBkt.IsAccessDeniedErr, userBkt.IsObjNotFoundErr)).Get(ctx, markerPath)

pkg/storage/parquet/util.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
package parquet
22

3-
func ShouldConvertBlockToParquet(mint, maxt int64, timeRanges []int64) bool {
3+
func ShouldConvertBlockToParquet(mint, maxt int64, timeRanges []int64, noCompactMarkExist bool) bool {
44
// We assume timeRanges[0] is the TSDB block duration (2h), and we don't convert them.
5-
return getBlockTimeRange(mint, maxt, timeRanges) > timeRanges[0]
5+
blockTimeRange := getBlockTimeRange(mint, maxt, timeRanges)
6+
if blockTimeRange > timeRanges[0] {
7+
return true
8+
}
9+
10+
if blockTimeRange == timeRanges[0] && noCompactMarkExist {
11+
return true
12+
}
13+
return false
614
}
715

816
func getBlockTimeRange(mint, maxt int64, timeRanges []int64) int64 {

pkg/storage/parquet/util_test.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ import (
1111

1212
func TestShouldConvertBlockToParquet(t *testing.T) {
1313
for _, tc := range []struct {
14-
name string
15-
mint, maxt int64
16-
durations tsdb.DurationList
17-
expected bool
14+
name string
15+
mint, maxt int64
16+
durations tsdb.DurationList
17+
expected bool
18+
noCompactMarkExist bool
1819
}{
1920
{
2021
name: "2h block. Don't convert",
@@ -30,6 +31,22 @@ func TestShouldConvertBlockToParquet(t *testing.T) {
3031
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
3132
expected: false,
3233
},
34+
{
35+
name: "2h block. Exist NoCompactMark. Convert",
36+
mint: 0,
37+
maxt: 2 * time.Hour.Milliseconds(),
38+
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
39+
expected: true,
40+
noCompactMarkExist: true,
41+
},
42+
{
43+
name: "1h block. Exist NoCompactMark. Convert",
44+
mint: 0,
45+
maxt: 1 * time.Hour.Milliseconds(),
46+
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
47+
expected: true,
48+
noCompactMarkExist: true,
49+
},
3350
{
3451
name: "3h block. Convert",
3552
mint: 0,
@@ -60,7 +77,7 @@ func TestShouldConvertBlockToParquet(t *testing.T) {
6077
},
6178
} {
6279
t.Run(tc.name, func(t *testing.T) {
63-
res := ShouldConvertBlockToParquet(tc.mint, tc.maxt, (&tc.durations).ToMilliseconds())
80+
res := ShouldConvertBlockToParquet(tc.mint, tc.maxt, (&tc.durations).ToMilliseconds(), tc.noCompactMarkExist)
6481
require.Equal(t, tc.expected, res)
6582
})
6683
}

0 commit comments

Comments
 (0)