Skip to content

Monitor and protect distributor from OOMKilled due to too many in progress requests #5917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type Distributor struct {
ingesterQueryFailures *prometheus.CounterVec
replicationFactor prometheus.Gauge
latestSeenSampleTimestampPerUser *prometheus.GaugeVec
inflightPushRequestsCount prometheus.Gauge
}

// Config contains the configuration required to
Expand Down Expand Up @@ -344,6 +345,10 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "cortex_distributor_latest_seen_sample_timestamp_seconds",
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"}),
inflightPushRequestsCount: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_distributor_inflight_push_requests",
Help: "Current number of inflight push requests in distributor.",
}),
}

promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Expand All @@ -357,12 +362,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
ConstLabels: map[string]string{limitLabel: "max_ingestion_rate"},
}).Set(cfg.InstanceLimits.MaxIngestionRate)

promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_distributor_inflight_push_requests",
Help: "Current number of inflight push requests in distributor.",
}, func() float64 {
return float64(d.inflightPushRequests.Load())
})
promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_distributor_ingestion_rate_samples_per_second",
Help: "Current ingestion rate in samples/sec that distributor is using to limit access.",
Expand Down Expand Up @@ -591,7 +590,11 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

// We will report *this* request in the error too.
inflight := d.inflightPushRequests.Inc()
defer d.inflightPushRequests.Dec()
d.inflightPushRequestsCount.Inc()
defer func() {
d.inflightPushRequestsCount.Dec()
d.inflightPushRequests.Dec()
}()

now := time.Now()
d.activeUsers.UpdateUserTimestamp(userID, now)
Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error
return nil, err
}
return &closableHealthAndIngesterClient{
IngesterClient: NewIngesterClient(conn),
IngesterClient: NewIngesterClient(conn, cfg.MaxInflightPushRequests),
HealthClient: grpc_health_v1.NewHealthClient(conn),
conn: conn,
}, nil
Expand All @@ -67,12 +67,14 @@ func (c *closableHealthAndIngesterClient) Close() error {

// Config is the configuration struct for the ingester client
type Config struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
}

// RegisterFlags registers configuration settings used by the ingester client config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f)
f.Int64Var(&cfg.MaxInflightPushRequests, "ingester.client.max-inflight-push-requests", 0, "Max inflight push requests that this ingester client can handle. This limit is per-ingester-client. Additional requests will be rejected. 0 = unlimited.")
}

func (cfg *Config) Validate(log log.Logger) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/client/cortex_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestStreamingSends(t *testing.T) {
require.NoError(t, server.Serve(listen))
}()

client := NewIngesterClient(conn)
client := NewIngesterClient(conn, 0)
err = testData.clientRecv(clientCtx, client)
assert.Equal(t, true, grpcutil.IsGRPCContextCanceled(err))

Expand Down
29 changes: 27 additions & 2 deletions pkg/ingester/client/ingester.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading