Skip to content

Commit 93e0515

Browse files
committed
Added hashmod support for store sharding.
Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent febddd1 commit 93e0515

File tree

2 files changed

+117
-7
lines changed

2 files changed

+117
-7
lines changed

pkg/block/fetcher.go

+18-6
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, syn
391391

392392
var _ MetaFetcherFilter = (&LabelShardedMetaFilter{}).Filter
393393

394-
// LabelShardedMetaFilter is a MetaFetcher filter that filters out blocks that have no labels after relabelling.
394+
// LabelShardedMetaFilter represents struct that allows sharding.
395395
type LabelShardedMetaFilter struct {
396396
relabelConfig []*relabel.Config
397397
}
@@ -401,14 +401,26 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet
401401
return &LabelShardedMetaFilter{relabelConfig: relabelConfig}
402402
}
403403

404-
// Filter filters out blocks that filters blocks that have no labels after relabelling.
404+
// Special block that will have ULID of the meta.json being referenced too.
405+
const blockIDLabel = "__block_id"
406+
407+
// Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels.
405408
func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
409+
var lbls labels.Labels
406410
for id, m := range metas {
407-
if processedLabels := relabel.Process(labels.FromMap(m.Thanos.Labels), f.relabelConfig...); processedLabels != nil {
408-
continue
411+
lbls = lbls[:0]
412+
if cap(lbls) < len(m.Thanos.Labels)+1 {
413+
lbls = make([]labels.Label, 0, len(m.Thanos.Labels)+1)
414+
}
415+
lbls = append(lbls, labels.Label{Name: blockIDLabel, Value: id.String()})
416+
for k, v := range m.Thanos.Labels {
417+
lbls = append(lbls, labels.Label{Name: k, Value: v})
418+
}
419+
420+
if processedLabels := relabel.Process(lbls, f.relabelConfig...); len(processedLabels) == 0 {
421+
synced.WithLabelValues(labelExcludedMeta).Inc()
422+
delete(metas, id)
409423
}
410-
synced.WithLabelValues(labelExcludedMeta).Inc()
411-
delete(metas, id)
412424
}
413425
}
414426

pkg/block/fetcher_test.go

+99-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
278278
})
279279
}
280280

281-
func TestLabelShardedMetaFilter_Filter(t *testing.T) {
281+
func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) {
282282
relabelContentYaml := `
283283
- action: drop
284284
regex: "A"
@@ -340,6 +340,104 @@ func TestLabelShardedMetaFilter_Filter(t *testing.T) {
340340

341341
}
342342

343+
func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) {
344+
relabelContentYamlFmt := `
345+
- action: hashmod
346+
source_labels: ["%s"]
347+
target_label: shard
348+
modulus: 3
349+
- action: keep
350+
source_labels: ["shard"]
351+
regex: %d
352+
`
353+
for i := 0; i < 3; i++ {
354+
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
355+
var relabelConfig []*relabel.Config
356+
testutil.Ok(t, yaml.Unmarshal([]byte(fmt.Sprintf(relabelContentYamlFmt, blockIDLabel, i)), &relabelConfig))
357+
358+
f := NewLabelShardedMetaFilter(relabelConfig)
359+
360+
input := map[ulid.ULID]*metadata.Meta{
361+
ULID(1): {
362+
Thanos: metadata.Thanos{
363+
Labels: map[string]string{"cluster": "B", "message": "keepme"},
364+
},
365+
},
366+
ULID(2): {
367+
Thanos: metadata.Thanos{
368+
Labels: map[string]string{"something": "A", "message": "keepme"},
369+
},
370+
},
371+
ULID(3): {
372+
Thanos: metadata.Thanos{
373+
Labels: map[string]string{"cluster": "A", "message": "keepme"},
374+
},
375+
},
376+
ULID(4): {
377+
Thanos: metadata.Thanos{
378+
Labels: map[string]string{"cluster": "A", "something": "B", "message": "keepme"},
379+
},
380+
},
381+
ULID(5): {
382+
Thanos: metadata.Thanos{
383+
Labels: map[string]string{"cluster": "B"},
384+
},
385+
},
386+
ULID(6): {
387+
Thanos: metadata.Thanos{
388+
Labels: map[string]string{"cluster": "B", "message": "keepme"},
389+
},
390+
},
391+
ULID(7): {},
392+
ULID(8): {},
393+
ULID(9): {},
394+
ULID(10): {},
395+
ULID(11): {},
396+
ULID(12): {},
397+
ULID(13): {},
398+
ULID(14): {},
399+
ULID(15): {},
400+
}
401+
expected := map[ulid.ULID]*metadata.Meta{}
402+
switch i {
403+
case 0:
404+
expected = map[ulid.ULID]*metadata.Meta{
405+
ULID(2): input[ULID(2)],
406+
ULID(6): input[ULID(6)],
407+
ULID(11): input[ULID(11)],
408+
ULID(13): input[ULID(13)],
409+
}
410+
case 1:
411+
expected = map[ulid.ULID]*metadata.Meta{
412+
ULID(5): input[ULID(5)],
413+
ULID(7): input[ULID(7)],
414+
ULID(10): input[ULID(10)],
415+
ULID(12): input[ULID(12)],
416+
ULID(14): input[ULID(14)],
417+
ULID(15): input[ULID(15)],
418+
}
419+
case 2:
420+
expected = map[ulid.ULID]*metadata.Meta{
421+
ULID(1): input[ULID(1)],
422+
ULID(3): input[ULID(3)],
423+
ULID(4): input[ULID(4)],
424+
ULID(8): input[ULID(8)],
425+
ULID(9): input[ULID(9)],
426+
}
427+
}
428+
deleted := len(input) - len(expected)
429+
430+
synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
431+
f.Filter(input, synced, false)
432+
433+
testutil.Equals(t, expected, input)
434+
testutil.Equals(t, float64(deleted), promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta)))
435+
436+
})
437+
438+
}
439+
}
440+
343441
func TestTimePartitionMetaFilter_Filter(t *testing.T) {
344442
mint := time.Unix(0, 1*time.Millisecond.Nanoseconds())
345443
maxt := time.Unix(0, 10*time.Millisecond.Nanoseconds())

0 commit comments

Comments
 (0)