Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug where querier may not be able to achieve max-concurrent #4417

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
* [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366

* [BUGFIX] Querier: Fix bug where querier may not be able to achieve max-concurrent. #4417

## 1.10.0 / 2021-08-03

Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ func (w *querierWorker) AddressRemoved(address string) {
w.mu.Lock()
p := w.managers[address]
delete(w.managers, address)
// Called with lock.
w.resetConcurrency()
w.mu.Unlock()

if p != nil {
Expand Down
73 changes: 43 additions & 30 deletions pkg/querier/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
"context"
"fmt"
"math/rand"
"testing"
"time"

Expand All @@ -17,46 +18,52 @@ import (

func TestResetConcurrency(t *testing.T) {
tests := []struct {
name string
parallelism int
maxConcurrent int
numTargets int
expectedConcurrency int
name string
parallelism int
maxConcurrent int
numTargets int
expectedConcurrency int
exepctedConcurrencyAfterTargetRemoval int
}{
{
name: "Test create at least one processor per target",
parallelism: 0,
maxConcurrent: 0,
numTargets: 2,
expectedConcurrency: 2,
name: "Test create at least one processor per target",
parallelism: 0,
maxConcurrent: 0,
numTargets: 2,
expectedConcurrency: 2,
exepctedConcurrencyAfterTargetRemoval: 1,
},
{
name: "Test parallelism per target",
parallelism: 4,
maxConcurrent: 0,
numTargets: 2,
expectedConcurrency: 8,
name: "Test parallelism per target",
parallelism: 4,
maxConcurrent: 0,
numTargets: 2,
expectedConcurrency: 8,
exepctedConcurrencyAfterTargetRemoval: 4,
},
{
name: "Test Total Parallelism with a remainder",
parallelism: 1,
maxConcurrent: 7,
numTargets: 4,
expectedConcurrency: 7,
name: "Test Total Parallelism with a remainder",
parallelism: 1,
maxConcurrent: 7,
numTargets: 4,
expectedConcurrency: 7,
exepctedConcurrencyAfterTargetRemoval: 7,
},
{
name: "Test Total Parallelism dividing evenly",
parallelism: 1,
maxConcurrent: 6,
numTargets: 2,
expectedConcurrency: 6,
name: "Test Total Parallelism dividing evenly",
parallelism: 1,
maxConcurrent: 6,
numTargets: 2,
expectedConcurrency: 6,
exepctedConcurrencyAfterTargetRemoval: 6,
},
{
name: "Test Total Parallelism at least one worker per target",
parallelism: 1,
maxConcurrent: 3,
numTargets: 6,
expectedConcurrency: 6,
name: "Test Total Parallelism at least one worker per target",
parallelism: 1,
maxConcurrent: 3,
numTargets: 6,
expectedConcurrency: 6,
exepctedConcurrencyAfterTargetRemoval: 5,
},
}

Expand All @@ -82,6 +89,12 @@ func TestResetConcurrency(t *testing.T) {
return getConcurrentProcessors(w)
})

// now we remove an address and ensure we still have the expected concurrency
w.AddressRemoved(fmt.Sprintf("127.0.0.1:%d", rand.Intn(tt.numTargets)))
test.Poll(t, 250*time.Millisecond, tt.exepctedConcurrencyAfterTargetRemoval, func() interface{} {
return getConcurrentProcessors(w)
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), w))
assert.Equal(t, 0, getConcurrentProcessors(w))
})
Expand Down