Skip to content

Commit 528f37a

Browse files
committed
Added hashmod support for store sharding.
Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent febddd1 commit 528f37a

File tree

2 files changed

+115
-5
lines changed

2 files changed

+115
-5
lines changed

pkg/block/fetcher.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -401,14 +401,25 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet
401401
return &LabelShardedMetaFilter{relabelConfig: relabelConfig}
402402
}
403403

404+
const blockIDLabel = "__block_id"
405+
404406
// Filter filters out blocks that filters blocks that have no labels after relabelling.
405407
func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
408+
lbls := make(labels.Labels, 1)
406409
for id, m := range metas {
407-
if processedLabels := relabel.Process(labels.FromMap(m.Thanos.Labels), f.relabelConfig...); processedLabels != nil {
408-
continue
410+
lbls[0] = labels.Label{Name: blockIDLabel, Value: id.String()}
411+
lbls = lbls[:1]
412+
if len(lbls) < len(m.Thanos.Labels) {
413+
lbls = make([]labels.Label, 0, len(m.Thanos.Labels))
414+
}
415+
for k, v := range m.Thanos.Labels {
416+
lbls = append(lbls, labels.Label{Name: k, Value: v})
417+
}
418+
419+
if processedLabels := relabel.Process(lbls, f.relabelConfig...); len(processedLabels) == 0 {
420+
synced.WithLabelValues(labelExcludedMeta).Inc()
421+
delete(metas, id)
409422
}
410-
synced.WithLabelValues(labelExcludedMeta).Inc()
411-
delete(metas, id)
412423
}
413424
}
414425

pkg/block/fetcher_test.go

+100-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
278278
})
279279
}
280280

281-
func TestLabelShardedMetaFilter_Filter(t *testing.T) {
281+
func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) {
282282
relabelContentYaml := `
283283
- action: drop
284284
regex: "A"
@@ -340,6 +340,105 @@ func TestLabelShardedMetaFilter_Filter(t *testing.T) {
340340

341341
}
342342

343+
func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) {
344+
relabelContentYamlFmt := `
345+
- action: hashmod
346+
source_labels: ["%s"]
347+
target_label: shard
348+
modulus: 3
349+
- action: keep
350+
source_labels: ["shard"]
351+
regex: %d
352+
`
353+
for i := 0; i < 3; i++ {
354+
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
355+
var relabelConfig []*relabel.Config
356+
testutil.Ok(t, yaml.Unmarshal([]byte(fmt.Sprintf(relabelContentYamlFmt, blockIDLabel, i)), &relabelConfig))
357+
358+
f := NewLabelShardedMetaFilter(relabelConfig)
359+
360+
input := map[ulid.ULID]*metadata.Meta{
361+
ULID(1): {
362+
Thanos: metadata.Thanos{
363+
Labels: map[string]string{"cluster": "B", "message": "keepme"},
364+
},
365+
},
366+
ULID(2): {
367+
Thanos: metadata.Thanos{
368+
Labels: map[string]string{"something": "A", "message": "keepme"},
369+
},
370+
},
371+
ULID(3): {
372+
Thanos: metadata.Thanos{
373+
Labels: map[string]string{"cluster": "A", "message": "keepme"},
374+
},
375+
},
376+
ULID(4): {
377+
Thanos: metadata.Thanos{
378+
Labels: map[string]string{"cluster": "A", "something": "B", "message": "keepme"},
379+
},
380+
},
381+
ULID(5): {
382+
Thanos: metadata.Thanos{
383+
Labels: map[string]string{"cluster": "B"},
384+
},
385+
},
386+
ULID(6): {
387+
Thanos: metadata.Thanos{
388+
Labels: map[string]string{"cluster": "B", "message": "keepme"},
389+
},
390+
},
391+
ULID(7): {},
392+
ULID(8): {},
393+
ULID(9): {},
394+
ULID(10): {},
395+
ULID(11): {},
396+
ULID(12): {},
397+
ULID(13): {},
398+
ULID(14): {},
399+
ULID(15): {},
400+
}
401+
expected := map[ulid.ULID]*metadata.Meta{}
402+
switch i {
403+
case 0:
404+
expected = map[ulid.ULID]*metadata.Meta{
405+
ULID(11): input[ULID(11)],
406+
ULID(13): input[ULID(13)],
407+
}
408+
case 1:
409+
expected = map[ulid.ULID]*metadata.Meta{
410+
ULID(1): input[ULID(1)],
411+
ULID(2): input[ULID(2)],
412+
ULID(3): input[ULID(3)],
413+
ULID(4): input[ULID(4)],
414+
ULID(5): input[ULID(5)],
415+
ULID(6): input[ULID(6)],
416+
ULID(7): input[ULID(7)],
417+
ULID(10): input[ULID(10)],
418+
ULID(12): input[ULID(12)],
419+
ULID(14): input[ULID(14)],
420+
ULID(15): input[ULID(15)],
421+
}
422+
case 2:
423+
expected = map[ulid.ULID]*metadata.Meta{
424+
ULID(8): input[ULID(8)],
425+
ULID(9): input[ULID(9)],
426+
}
427+
}
428+
deleted := len(input) - len(expected)
429+
430+
synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
431+
f.Filter(input, synced, false)
432+
433+
testutil.Equals(t, len(expected), len(input))
434+
testutil.Equals(t, expected, input)
435+
testutil.Equals(t, float64(deleted), promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta)))
436+
437+
})
438+
439+
}
440+
}
441+
343442
func TestTimePartitionMetaFilter_Filter(t *testing.T) {
344443
mint := time.Unix(0, 1*time.Millisecond.Nanoseconds())
345444
maxt := time.Unix(0, 10*time.Millisecond.Nanoseconds())

0 commit comments

Comments
 (0)