Skip to content

Commit 6758f1f

Browse files
authored
Fix/concurrent map interaction (#4716)
* Fix race condition on /series where late response cause concurrent map iteration error * Update Changelog * Update Changelog * Fixing lint * fix data race
1 parent b086b82 commit 6758f1f

File tree

3 files changed

+40
-6
lines changed

3 files changed

+40
-6
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction
1010
* [FEATURE] Querier/Query-Frontend: Add `-querier.per-step-stats-enabled` and `-frontend.cache-queryable-samples-stats` configurations to enable query sample statistics
1111
* [FEATURE] Add shuffle sharding for the compactor #4433
12+
* [BUGFIX] Distributor: Fix race condition on `/series` introduced by #4683. #4716
1213

1314
## 1.12.0 in progress
1415

pkg/distributor/distributor.go

+2
Original file line numberDiff line numberDiff line change
@@ -980,12 +980,14 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
980980
return nil, err
981981
}
982982

983+
mutex.Lock()
983984
result := make([]metric.Metric, 0, len(metrics))
984985
for _, m := range metrics {
985986
result = append(result, metric.Metric{
986987
Metric: m,
987988
})
988989
}
990+
mutex.Unlock()
989991
return result, nil
990992
}
991993

pkg/distributor/distributor_test.go

+37-6
Original file line numberDiff line numberDiff line change
@@ -1841,6 +1841,36 @@ func TestSlowQueries(t *testing.T) {
18411841
}
18421842
}
18431843

1844+
func TestDistributor_MetricsForLabelMatchers_SingleSlowIngester(t *testing.T) {
1845+
// Create distributor
1846+
ds, ing, _, _ := prepare(t, prepConfig{
1847+
numIngesters: 3,
1848+
happyIngesters: 3,
1849+
numDistributors: 1,
1850+
shardByAllLabels: true,
1851+
shuffleShardEnabled: true,
1852+
shuffleShardSize: 3,
1853+
replicationFactor: 3,
1854+
})
1855+
1856+
ing[2].queryDelay = 50 * time.Millisecond
1857+
1858+
ctx := user.InjectOrgID(context.Background(), "test")
1859+
1860+
now := model.Now()
1861+
1862+
for i := 0; i < 100; i++ {
1863+
req := mockWriteRequest([]labels.Labels{{{Name: labels.MetricName, Value: "test"}, {Name: "app", Value: "m"}, {Name: "uniq8", Value: strconv.Itoa(i)}}}, 1, now.Unix())
1864+
_, err := ds[0].Push(ctx, req)
1865+
require.NoError(t, err)
1866+
}
1867+
1868+
for i := 0; i < 50; i++ {
1869+
_, err := ds[0].MetricsForLabelMatchers(ctx, now, now, mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test"))
1870+
require.NoError(t, err)
1871+
}
1872+
}
1873+
18441874
func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
18451875
const numIngesters = 5
18461876

@@ -2192,10 +2222,10 @@ type prepConfig struct {
21922222
errFail error
21932223
}
21942224

2195-
func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) {
2196-
ingesters := []mockIngester{}
2225+
func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*prometheus.Registry, *ring.Ring) {
2226+
ingesters := []*mockIngester{}
21972227
for i := 0; i < cfg.happyIngesters; i++ {
2198-
ingesters = append(ingesters, mockIngester{
2228+
ingesters = append(ingesters, &mockIngester{
21992229
happy: *atomic.NewBool(true),
22002230
queryDelay: cfg.queryDelay,
22012231
})
@@ -2206,7 +2236,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*
22062236
miError = cfg.errFail
22072237
}
22082238

2209-
ingesters = append(ingesters, mockIngester{
2239+
ingesters = append(ingesters, &mockIngester{
22102240
queryDelay: cfg.queryDelay,
22112241
failResp: *atomic.NewError(miError),
22122242
})
@@ -2225,7 +2255,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*
22252255
RegisteredTimestamp: time.Now().Add(-2 * time.Hour).Unix(),
22262256
Tokens: []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)},
22272257
}
2228-
ingestersByAddr[addr] = &ingesters[i]
2258+
ingestersByAddr[addr] = ingesters[i]
22292259
}
22302260

22312261
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
@@ -2637,6 +2667,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest
26372667
}
26382668

26392669
func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest, opts ...grpc.CallOption) (*client.MetricsForLabelMatchersResponse, error) {
2670+
time.Sleep(i.queryDelay)
26402671
i.Lock()
26412672
defer i.Unlock()
26422673

@@ -3098,7 +3129,7 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing
30983129
require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...))
30993130
}
31003131

3101-
func countMockIngestersCalls(ingesters []mockIngester, name string) int {
3132+
func countMockIngestersCalls(ingesters []*mockIngester, name string) int {
31023133
count := 0
31033134
for i := 0; i < len(ingesters); i++ {
31043135
if ingesters[i].countCalls(name) > 0 {

0 commit comments

Comments
 (0)