Skip to content

Commit 383dcce

Browse files
alvinlin123bborehamaknuds1
committed
Fix bug where querier may not be able to achieve max-concurrent (cortexproject#4417)
* Fix bug where querier may not be able to achieve max-concurrent Signed-off-by: Alvin Lin <[email protected]> * Update change log Signed-off-by: Alvin Lin <[email protected]> * Update CHANGELOG.md Co-authored-by: Bryan Boreham <[email protected]> Signed-off-by: Alvin Lin <[email protected]> * Update pkg/querier/worker/worker_test.go Co-authored-by: Arve Knudsen <[email protected]> Signed-off-by: Alvin Lin <[email protected]> * Address PR comments Signed-off-by: Alvin Lin <[email protected]> Co-authored-by: Bryan Boreham <[email protected]> Co-authored-by: Arve Knudsen <[email protected]> Signed-off-by: Alvin Lin <[email protected]>
1 parent 5098d0b commit 383dcce

File tree

3 files changed

+49
-30
lines changed

3 files changed

+49
-30
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,12 @@
3838
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
3939
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
4040
* [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366
41+
<<<<<<< HEAD
4142
=======
4243
>>>>>>> Update for 1.10.0-rc.0 release candidate
44+
=======
45+
* [BUGFIX] Querier: After query-frontend restart, querier may have lower than configured concurrency. #4417
46+
>>>>>>> Fix bug where querier may not be able to achieve max-concurrent (#4417)
4347
4448
## 1.10.0 / 2021-08-03
4549

pkg/querier/worker/worker.go

+2
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ func (w *querierWorker) AddressRemoved(address string) {
208208
w.mu.Lock()
209209
p := w.managers[address]
210210
delete(w.managers, address)
211+
// Called with lock.
212+
w.resetConcurrency()
211213
w.mu.Unlock()
212214

213215
if p != nil {

pkg/querier/worker/worker_test.go

+43-30
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package worker
33
import (
44
"context"
55
"fmt"
6+
"math/rand"
67
"testing"
78
"time"
89

@@ -17,46 +18,52 @@ import (
1718

1819
func TestResetConcurrency(t *testing.T) {
1920
tests := []struct {
20-
name string
21-
parallelism int
22-
maxConcurrent int
23-
numTargets int
24-
expectedConcurrency int
21+
name string
22+
parallelism int
23+
maxConcurrent int
24+
numTargets int
25+
expectedConcurrency int
26+
expectedConcurrencyAfterTargetRemoval int
2527
}{
2628
{
27-
name: "Test create at least one processor per target",
28-
parallelism: 0,
29-
maxConcurrent: 0,
30-
numTargets: 2,
31-
expectedConcurrency: 2,
29+
name: "Test create at least one processor per target",
30+
parallelism: 0,
31+
maxConcurrent: 0,
32+
numTargets: 2,
33+
expectedConcurrency: 2,
34+
expectedConcurrencyAfterTargetRemoval: 1,
3235
},
3336
{
34-
name: "Test parallelism per target",
35-
parallelism: 4,
36-
maxConcurrent: 0,
37-
numTargets: 2,
38-
expectedConcurrency: 8,
37+
name: "Test parallelism per target",
38+
parallelism: 4,
39+
maxConcurrent: 0,
40+
numTargets: 2,
41+
expectedConcurrency: 8,
42+
expectedConcurrencyAfterTargetRemoval: 4,
3943
},
4044
{
41-
name: "Test Total Parallelism with a remainder",
42-
parallelism: 1,
43-
maxConcurrent: 7,
44-
numTargets: 4,
45-
expectedConcurrency: 7,
45+
name: "Test Total Parallelism with a remainder",
46+
parallelism: 1,
47+
maxConcurrent: 7,
48+
numTargets: 4,
49+
expectedConcurrency: 7,
50+
expectedConcurrencyAfterTargetRemoval: 7,
4651
},
4752
{
48-
name: "Test Total Parallelism dividing evenly",
49-
parallelism: 1,
50-
maxConcurrent: 6,
51-
numTargets: 2,
52-
expectedConcurrency: 6,
53+
name: "Test Total Parallelism dividing evenly",
54+
parallelism: 1,
55+
maxConcurrent: 6,
56+
numTargets: 2,
57+
expectedConcurrency: 6,
58+
expectedConcurrencyAfterTargetRemoval: 6,
5359
},
5460
{
55-
name: "Test Total Parallelism at least one worker per target",
56-
parallelism: 1,
57-
maxConcurrent: 3,
58-
numTargets: 6,
59-
expectedConcurrency: 6,
61+
name: "Test Total Parallelism at least one worker per target",
62+
parallelism: 1,
63+
maxConcurrent: 3,
64+
numTargets: 6,
65+
expectedConcurrency: 6,
66+
expectedConcurrencyAfterTargetRemoval: 5,
6067
},
6168
}
6269

@@ -82,6 +89,12 @@ func TestResetConcurrency(t *testing.T) {
8289
return getConcurrentProcessors(w)
8390
})
8491

92+
// now we remove an address and ensure we still have the expected concurrency
93+
w.AddressRemoved(fmt.Sprintf("127.0.0.1:%d", rand.Intn(tt.numTargets)))
94+
test.Poll(t, 250*time.Millisecond, tt.expectedConcurrencyAfterTargetRemoval, func() interface{} {
95+
return getConcurrentProcessors(w)
96+
})
97+
8598
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), w))
8699
assert.Equal(t, 0, getConcurrentProcessors(w))
87100
})

0 commit comments

Comments
 (0)