Skip to content

Commit 1ddb423

Browse files
pracuccipstibrany
andauthored
Fix queriers shuffle-sharding blast radius containment (#3901)
* Added forget timeout support to queues Signed-off-by: Marco Pracucci <[email protected]> * Added notify shutdown rpc to query-frontend and query-scheduler proto Signed-off-by: Marco Pracucci <[email protected]> * Querier worker notifies shutdown to query-frontend/scheduler Signed-off-by: Marco Pracucci <[email protected]> * Log when query-frontend/scheduler receives a shutdown notification Signed-off-by: Marco Pracucci <[email protected]> * Added config option to configure the forget timeout Signed-off-by: Marco Pracucci <[email protected]> * Fixed re-connect while in forget waiting period Signed-off-by: Marco Pracucci <[email protected]> * Fixed unit tests Signed-off-by: Marco Pracucci <[email protected]> * Fixed GetNextRequestForQuerier() when a resharding happen after fogetting a querier Signed-off-by: Marco Pracucci <[email protected]> * Update pkg/frontend/v1/frontend.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Update pkg/scheduler/queue/user_queues.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Update pkg/scheduler/queue/user_queues.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Update pkg/scheduler/scheduler.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Update pkg/querier/worker/frontend_processor.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Updated comment based on review feedback Signed-off-by: Marco Pracucci <[email protected]> * Updated comment based on review feedback Signed-off-by: Marco Pracucci <[email protected]> * Updated generated doc Signed-off-by: Marco Pracucci <[email protected]> * Added name to services Signed-off-by: Marco Pracucci <[email protected]> * Moved forgetCheckPeriod where it's used Signed-off-by: Marco Pracucci <[email protected]> * Added queues forget timeout unit tests Signed-off-by: Marco Pracucci <[email protected]> * Added RequestQueue unit test Signed-off-by: Marco Pracucci <[email protected]> * Renamed querier forget timeout into delay Signed-off-by: Marco Pracucci <[email protected]> * Added timeout to the notify shutdown notification Signed-off-by: Marco Pracucci <[email protected]> * Updated doc Signed-off-by: Marco Pracucci <[email protected]> * Added CHANGELOG entry Signed-off-by: Marco Pracucci <[email protected]> * Update pkg/scheduler/scheduler.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Update pkg/frontend/v1/frontend.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Updated doc Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]>
1 parent 40022ad commit 1ddb423

22 files changed

+1513
-166
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916
77
* `cortex_ruler_clients`
88
* `cortex_ruler_client_request_duration_seconds`
9+
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
910

1011
## 1.8.0 in progress
1112

docs/configuration/config-file-reference.md

+14
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,13 @@ query_scheduler:
589589
# CLI flag: -query-scheduler.max-outstanding-requests-per-tenant
590590
[max_outstanding_requests_per_tenant: <int> | default = 100]
591591

592+
# If a querier disconnects without sending notification about graceful
593+
# shutdown, the query-scheduler will keep the querier in the tenant's shard
594+
# until the forget delay has passed. This feature is useful to reduce the
595+
# blast radius when shuffle-sharding is enabled.
596+
# CLI flag: -query-scheduler.querier-forget-delay
597+
[querier_forget_delay: <duration> | default = 0s]
598+
592599
# This configures the gRPC client used to report errors back to the
593600
# query-frontend.
594601
grpc_client_config:
@@ -1322,6 +1329,13 @@ The `query_frontend_config` configures the Cortex query-frontend.
13221329
# CLI flag: -querier.max-outstanding-requests-per-tenant
13231330
[max_outstanding_per_tenant: <int> | default = 100]
13241331
1332+
# If a querier disconnects without sending notification about graceful shutdown,
1333+
# the query-frontend will keep the querier in the tenant's shard until the
1334+
# forget delay has passed. This feature is useful to reduce the blast radius
1335+
# when shuffle-sharding is enabled.
1336+
# CLI flag: -query-frontend.querier-forget-delay
1337+
[querier_forget_delay: <duration> | default = 0s]
1338+
13251339
# DNS hostname used for finding query-schedulers.
13261340
# CLI flag: -frontend.scheduler-address
13271341
[scheduler_address: <string> | default = ""]

docs/guides/shuffle-sharding.md

+9
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,15 @@ Note that this distribution happens in query-frontend, or query-scheduler if use
125125

126126
_The maximum number of queriers can be overridden on a per-tenant basis in the limits overrides configuration._
127127

128+
#### The impact of "query of death"
129+
130+
In the event a tenant is repeatedly sending a "query of death" which leads the querier to crash or getting killed because of out-of-memory, the crashed querier will get disconnected from the query-frontend or query-scheduler and a new querier will be immediately assigned to the tenant's shard. This practically invalidates the assumption that shuffle-sharding can be used to contain the blast radius in case of a query of death.
131+
132+
To mitigate it, Cortex allows to configure a delay between when a querier disconnects because of a crash and when the crashed querier is actually removed from the tenant's shard (and another healthy querier is added as replacement). A delay of 1 minute may be a reasonable trade-off:
133+
134+
- Query-frontend: `-query-frontend.querier-forget-delay=1m`
135+
- Query-scheduler: `-query-scheduler.querier-forget-delay=1m`
136+
128137
### Store-gateway shuffle sharding
129138

130139
The Cortex store-gateway -- used by the [blocks storage](../blocks-storage/_index.md) -- by default spreads each tenant's blocks across all running store-gateways.

pkg/frontend/config.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i
7070

7171
default:
7272
// No scheduler = use original frontend.
73-
fr := v1.New(cfg.FrontendV1, limits, log, reg)
73+
fr, err := v1.New(cfg.FrontendV1, limits, log, reg)
74+
if err != nil {
75+
return nil, nil, nil, err
76+
}
7477
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil
7578
}
7679
}

pkg/frontend/v1/frontend.go

+45-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package v1
22

33
import (
44
"context"
5-
"errors"
65
"flag"
76
"fmt"
87
"net/http"
@@ -11,6 +10,7 @@ import (
1110
"github.com/go-kit/kit/log"
1211
"github.com/go-kit/kit/log/level"
1312
"github.com/opentracing/opentracing-go"
13+
"github.com/pkg/errors"
1414
"github.com/prometheus/client_golang/prometheus"
1515
"github.com/prometheus/client_golang/prometheus/promauto"
1616
"github.com/weaveworks/common/httpgrpc"
@@ -31,12 +31,14 @@ var (
3131

3232
// Config for a Frontend.
3333
type Config struct {
34-
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
34+
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
35+
QuerierForgetDelay time.Duration `yaml:"querier_forget_delay"`
3536
}
3637

3738
// RegisterFlags adds the flags required to config this to the given FlagSet.
3839
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
3940
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
41+
f.DurationVar(&cfg.QuerierForgetDelay, "query-frontend.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-frontend will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")
4042
}
4143

4244
type Limits interface {
@@ -56,6 +58,10 @@ type Frontend struct {
5658
requestQueue *queue.RequestQueue
5759
activeUsers *util.ActiveUsersCleanupService
5860

61+
// Subservices manager.
62+
subservices *services.Manager
63+
subservicesWatcher *services.FailureWatcher
64+
5965
// Metrics.
6066
queueLength *prometheus.GaugeVec
6167
discardedRequests *prometheus.CounterVec
@@ -74,8 +80,7 @@ type request struct {
7480
}
7581

7682
// New creates a new frontend. Frontend implements service, and must be started and stopped.
77-
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) *Frontend {
78-
83+
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) {
7984
f := &Frontend{
8085
cfg: cfg,
8186
log: log,
@@ -95,26 +100,48 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
95100
}),
96101
}
97102

98-
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, f.queueLength, f.discardedRequests)
103+
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests)
99104
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)
100105

106+
var err error
107+
f.subservices, err = services.NewManager(f.requestQueue, f.activeUsers)
108+
if err != nil {
109+
return nil, err
110+
}
111+
101112
f.numClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
102113
Name: "cortex_query_frontend_connected_clients",
103114
Help: "Number of worker clients currently connected to the frontend.",
104115
}, f.requestQueue.GetConnectedQuerierWorkersMetric)
105116

106-
f.Service = services.NewIdleService(f.starting, f.stopping)
107-
return f
117+
f.Service = services.NewBasicService(f.starting, f.running, f.stopping)
118+
return f, nil
108119
}
109120

110121
func (f *Frontend) starting(ctx context.Context) error {
111-
return services.StartAndAwaitRunning(ctx, f.activeUsers)
122+
f.subservicesWatcher.WatchManager(f.subservices)
123+
124+
if err := services.StartManagerAndAwaitHealthy(ctx, f.subservices); err != nil {
125+
return errors.Wrap(err, "unable to start frontend subservices")
126+
}
127+
128+
return nil
129+
}
130+
131+
func (f *Frontend) running(ctx context.Context) error {
132+
for {
133+
select {
134+
case <-ctx.Done():
135+
return nil
136+
case err := <-f.subservicesWatcher.Chan():
137+
return errors.Wrap(err, "frontend subservice failed")
138+
}
139+
}
112140
}
113141

114142
func (f *Frontend) stopping(_ error) error {
115-
// Stops new requests and errors out any pending requests.
116-
f.requestQueue.Stop()
117-
return services.StopAndAwaitTerminated(context.Background(), f.activeUsers)
143+
// This will also stop the requests queue, which stop accepting new requests and errors out any pending requests.
144+
return services.StopManagerAndAwaitStopped(context.Background(), f.subservices)
118145
}
119146

120147
func (f *Frontend) cleanupInactiveUserMetrics(user string) {
@@ -258,6 +285,13 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
258285
}
259286
}
260287

288+
func (f *Frontend) NotifyClientShutdown(_ context.Context, req *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) {
289+
level.Info(f.log).Log("msg", "received shutdown notification from querier", "querier", req.GetClientID())
290+
f.requestQueue.NotifyQuerierShutdown(req.GetClientID())
291+
292+
return &frontendv1pb.NotifyClientShutdownResponse{}, nil
293+
}
294+
261295
func getQuerierID(server frontendv1pb.Frontend_ProcessServer) (string, error) {
262296
err := server.Send(&frontendv1pb.FrontendToClient{
263297
Type: frontendv1pb.GET_ID,

pkg/frontend/v1/frontend_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func TestFrontendCheckReady(t *testing.T) {
128128
t.Run(tt.name, func(t *testing.T) {
129129
f := &Frontend{
130130
log: log.NewNopLogger(),
131-
requestQueue: queue.NewRequestQueue(5,
131+
requestQueue: queue.NewRequestQueue(5, 0,
132132
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
133133
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
134134
),
@@ -243,7 +243,8 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
243243
httpListen, err := net.Listen("tcp", "localhost:0")
244244
require.NoError(t, err)
245245

246-
v1 := New(config, limits{}, logger, reg)
246+
v1, err := New(config, limits{}, logger, reg)
247+
require.NoError(t, err)
247248
require.NotNil(t, v1)
248249
require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1))
249250
defer func() {

0 commit comments

Comments
 (0)