Skip to content

Monitor and protect distributor from OOMKilled due to too many in progress requests #5917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
* [ENHANCEMENT] Ingester: Allowing to configure `-blocks-storage.tsdb.head-compaction-interval` flag up to 30 min and add a jitter on the first head compaction. #5919
* [ENHANCEMENT] Distributor: Added `max_inflight_push_requests` config to ingester client to protect distributor from OOMKilled. #5917
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906

## 1.17.0 2024-04-30
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3026,6 +3026,11 @@ grpc_client_config:
# Skip validating server certificate.
# CLI flag: -ingester.client.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]

# Max inflight push requests that this ingester client can handle. This limit is
# per-ingester-client. Additional requests will be rejected. 0 = unlimited.
# CLI flag: -ingester.client.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 0]
```

### `limits_config`
Expand Down
63 changes: 53 additions & 10 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"github.com/cortexproject/cortex/pkg/util/grpcclient"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)
Expand All @@ -20,6 +22,19 @@ var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.Histogra
Help: "Time spent doing Ingester requests.",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
}, []string{"operation", "status_code"})
var ingesterClientInflightPushRequests = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "ingester_client_inflight_push_requests",
Help: "Number of Ingester client push requests.",
}, []string{"ingester"})

var errTooManyInflightPushRequests = errors.New("too many inflight push requests in ingester client")

// ClosableClientConn is grpc.ClientConnInterface with Close function
type ClosableClientConn interface {
grpc.ClientConnInterface
Close() error
}

// HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient.
type HealthAndIngesterClient interface {
Expand All @@ -32,16 +47,40 @@ type HealthAndIngesterClient interface {
type closableHealthAndIngesterClient struct {
IngesterClient
grpc_health_v1.HealthClient
conn *grpc.ClientConn
conn ClosableClientConn
maxInflightPushRequests int64
inflightRequests atomic.Int64
inflightPushRequests prometheus.Gauge
}

func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
out := new(cortexpb.WriteResponse)
err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...)
if err != nil {
return nil, err
return c.handlePushRequest(func() (*cortexpb.WriteResponse, error) {
out := new(cortexpb.WriteResponse)
err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
})
}

func (c *closableHealthAndIngesterClient) Push(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
return c.handlePushRequest(func() (*cortexpb.WriteResponse, error) {
return c.IngesterClient.Push(ctx, in, opts...)
})
}

func (c *closableHealthAndIngesterClient) handlePushRequest(mainFunc func() (*cortexpb.WriteResponse, error)) (*cortexpb.WriteResponse, error) {
currentInflight := c.inflightRequests.Inc()
c.inflightPushRequests.Inc()
defer func() {
c.inflightPushRequests.Dec()
c.inflightRequests.Dec()
}()
if c.maxInflightPushRequests > 0 && currentInflight > c.maxInflightPushRequests {
return nil, errTooManyInflightPushRequests
}
return out, nil
return mainFunc()
}

// MakeIngesterClient makes a new IngesterClient
Expand All @@ -55,9 +94,11 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error
return nil, err
}
return &closableHealthAndIngesterClient{
IngesterClient: NewIngesterClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
conn: conn,
IngesterClient: NewIngesterClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
conn: conn,
maxInflightPushRequests: cfg.MaxInflightPushRequests,
inflightPushRequests: ingesterClientInflightPushRequests.WithLabelValues(addr),
}, nil
}

Expand All @@ -67,12 +108,14 @@ func (c *closableHealthAndIngesterClient) Close() error {

// Config is the configuration struct for the ingester client
type Config struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
}

// RegisterFlags registers configuration settings used by the ingester client config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f)
f.Int64Var(&cfg.MaxInflightPushRequests, "ingester.client.max-inflight-push-requests", 0, "Max inflight push requests that this ingester client can handle. This limit is per-ingester-client. Additional requests will be rejected. 0 = unlimited.")
}

func (cfg *Config) Validate(log log.Logger) error {
Expand Down
83 changes: 83 additions & 0 deletions pkg/ingester/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"strconv"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -47,3 +50,83 @@ func TestMarshall(t *testing.T) {
require.Equal(t, numSeries, len(req.Timeseries))
}
}

func TestClosableHealthAndIngesterClient_MaxInflightPushRequests(t *testing.T) {
t.Parallel()

tests := map[string]struct {
inflightPushRequests int64
maxInflightPushRequests int64
expectThrottle bool
}{
"no limit": {
inflightPushRequests: 1000,
maxInflightPushRequests: 0,
expectThrottle: false,
},
"inflight request is under limit": {
inflightPushRequests: 99,
maxInflightPushRequests: 100,
expectThrottle: false,
},
"inflight request hits limit": {
inflightPushRequests: 100,
maxInflightPushRequests: 100,
expectThrottle: true,
},
}
ctx := context.Background()
for testName, testData := range tests {
tData := testData
t.Run(testName, func(t *testing.T) {
t.Parallel()

client1 := createTestIngesterClient(tData.maxInflightPushRequests, tData.inflightPushRequests)
_, err := client1.Push(ctx, nil)
if tData.expectThrottle {
assert.ErrorIs(t, err, errTooManyInflightPushRequests)
} else {
assert.NoError(t, err)
}

client2 := createTestIngesterClient(tData.maxInflightPushRequests, tData.inflightPushRequests)
_, err = client2.PushPreAlloc(ctx, nil)
if tData.expectThrottle {
assert.ErrorIs(t, err, errTooManyInflightPushRequests)
} else {
assert.NoError(t, err)
}
})
}
}

func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequests int64) *closableHealthAndIngesterClient {
client := &closableHealthAndIngesterClient{
IngesterClient: &mockIngester{},
conn: &mockClientConn{},
maxInflightPushRequests: maxInflightPushRequests,
inflightPushRequests: prometheus.NewGauge(prometheus.GaugeOpts{}),
}
client.inflightRequests.Add(currentInflightRequests)
return client
}

type mockIngester struct {
IngesterClient
}

func (m *mockIngester) Push(_ context.Context, _ *cortexpb.WriteRequest, _ ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}

type mockClientConn struct {
ClosableClientConn
}

func (m *mockClientConn) Invoke(_ context.Context, _ string, _ any, _ any, _ ...grpc.CallOption) error {
return nil
}

func (m *mockClientConn) Close() error {
return nil
}
Loading