Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor limits #4071

Merged
merged 8 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [ENHANCEMENT] Add a metric `cortex_compactor_compaction_interval_seconds` for the compaction interval config value. #4040
* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
* [ENHANCEMENT] Cortex is now built with Go 1.16. #4062
* [ENHANCEMENT] Distributor: added per-distributor limits: max number of inflight requests (`-distributor.max-inflight-push-requests`) and max ingestion rate in samples/sec (`-distributor.max-ingestion-rate`). If not set, these two are unlimited. Also added metrics to expose current values (`cortex_distributor_inflight_push_requests`, `cortex_distributor_ingestion_rate_samples_per_second`) as well as limits (`cortex_distributor_inflight_push_requests_limit`, `cortex_distributor_ingestion_rate_samples_per_second_limit`). #4071
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,18 @@ ring:
# Name of network interface to read address from.
# CLI flag: -distributor.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]

# Max ingestion rate (samples/sec) that distributor will accept. This limit is
# per-distributor, not per-tenant. Additional push requests will be rejected.
# Current ingestion rate is computed as exponentially weighted moving average,
# updated every second. 0 = unlimited.
# CLI flag: -distributor.max-ingestion-rate
[max_ingestion_rate: <float> | default = 0]

# Max inflight push requests that this distributor can handle. Additional
# requests will be rejected. 0 = unlimited.
# CLI flag: -distributor.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 0]
```

### `ingester_config`
Expand Down
66 changes: 64 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/cortexpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
Expand All @@ -32,7 +33,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/math"
util_math "github.com/cortexproject/cortex/pkg/util/math"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand All @@ -45,11 +46,17 @@ var (
// Validation errors.
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")

// Distributor instance limits errors.
errTooManyInflightPushRequests = errors.New("too many inflight push requests in distributor")
errMaxSamplesPushRateLimitReached = errors.New("distributor's samples push rate limit reached")
)

const (
typeSamples = "samples"
typeMetadata = "metadata"

instanceIngestionRateTickInterval = time.Second
)

// Distributor is a storage.SampleAppender and a client.Querier which
Expand Down Expand Up @@ -79,6 +86,9 @@ type Distributor struct {

activeUsers *util.ActiveUsersCleanupService

ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64

// Metrics
queryDuration *instrument.HistogramCollector
receivedSamples *prometheus.CounterVec
Expand Down Expand Up @@ -123,6 +133,10 @@ type Config struct {

// This config is dynamically injected because defined in the querier config.
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

// Limits for distributor
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -137,6 +151,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
f.Float64Var(&cfg.MaxIngestionRate, "distributor.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
f.IntVar(&cfg.MaxInflightPushRequests, "distributor.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. Additional requests will be rejected. 0 = unlimited.")
}

// Validate config and returns error on failure
Expand Down Expand Up @@ -200,6 +216,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -273,6 +290,29 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"}),
}

promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_distributor_inflight_push_requests",
Help: "Current number of inflight push requests in distributor.",
}, func() float64 {
return float64(d.inflightPushRequests.Load())
})
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_distributor_inflight_push_requests_limit",
Help: "Limit for inflight push requests",
}).Set(float64(cfg.MaxInflightPushRequests))

promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_distributor_ingestion_rate_samples_per_second",
Help: "Current ingestion rate in samples/sec that distributor is using to limit access.",
}, func() float64 {
return d.ingestionRate.Rate()
})
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_distributor_ingestion_rate_samples_per_second_limit",
Help: "Limit for Distributor's ingestion rate.",
}).Set(cfg.MaxIngestionRate)

d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
d.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(d.cleanupInactiveUser)

Expand All @@ -294,11 +334,17 @@ func (d *Distributor) starting(ctx context.Context) error {
}

func (d *Distributor) running(ctx context.Context) error {
ingestionRateTicker := time.NewTicker(instanceIngestionRateTickInterval)
defer ingestionRateTicker.Stop()

for {
select {
case <-ctx.Done():
return nil

case <-ingestionRateTicker.C:
d.ingestionRate.Tick()

case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
Expand Down Expand Up @@ -443,6 +489,20 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return nil, err
}

// We will report *this* request in the error too.
inflight := d.inflightPushRequests.Inc()
defer d.inflightPushRequests.Dec()

if d.cfg.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.MaxInflightPushRequests) {
return nil, errTooManyInflightPushRequests
}

if d.cfg.MaxIngestionRate > 0 {
if rate := d.ingestionRate.Rate(); rate >= d.cfg.MaxIngestionRate {
return nil, errMaxSamplesPushRateLimitReached
}
}

now := time.Now()
d.activeUsers.UpdateUserTimestamp(userID, now)

Expand Down Expand Up @@ -508,7 +568,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
for _, ts := range req.Timeseries {
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
if len(ts.Samples) > 0 {
latestSampleTimestampMs = math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
latestSampleTimestampMs = util_math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
}

if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
Expand Down Expand Up @@ -603,6 +663,8 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
}

d.ingestionRate.Add(int64(validatedSamples))

subRing := d.ingestersRing

// Obtain a subring if required.
Expand Down
154 changes: 154 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,156 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
}
}

func TestDistributor_PushInstanceLimits(t *testing.T) {
type testPush struct {
samples int
metadata int
expectedError error
}

tests := map[string]struct {
preInflight int
preRateSamples int // initial rate before first push
pushes []testPush // rate is recomputed after each push

// limits
inflightLimit int
ingestionRateLimit float64

metricNames []string
expectedMetrics string
}{
"no limits limit": {
preInflight: 100,
preRateSamples: 1000,

pushes: []testPush{
{samples: 100, expectedError: nil},
},

metricNames: []string{"cortex_distributor_inflight_push_requests_limit", "cortex_distributor_ingestion_rate_samples_per_second_limit"},
expectedMetrics: `
# HELP cortex_distributor_inflight_push_requests_limit Limit for inflight push requests
# TYPE cortex_distributor_inflight_push_requests_limit gauge
cortex_distributor_inflight_push_requests_limit 0

# HELP cortex_distributor_ingestion_rate_samples_per_second_limit Limit for Distributor's ingestion rate.
# TYPE cortex_distributor_ingestion_rate_samples_per_second_limit gauge
cortex_distributor_ingestion_rate_samples_per_second_limit 0
`,
},
"below inflight limit": {
preInflight: 100,
inflightLimit: 101,
pushes: []testPush{
{samples: 100, expectedError: nil},
},

metricNames: []string{"cortex_distributor_inflight_push_requests_limit", "cortex_distributor_inflight_push_requests"},
expectedMetrics: `
# HELP cortex_distributor_inflight_push_requests Current number of inflight push requests in distributor.
# TYPE cortex_distributor_inflight_push_requests gauge
cortex_distributor_inflight_push_requests 100

# HELP cortex_distributor_inflight_push_requests_limit Limit for inflight push requests
# TYPE cortex_distributor_inflight_push_requests_limit gauge
cortex_distributor_inflight_push_requests_limit 101
`,
},
"hits inflight limit": {
preInflight: 101,
inflightLimit: 101,
pushes: []testPush{
{samples: 100, expectedError: errTooManyInflightPushRequests},
},
},
"below ingestion rate limit": {
preRateSamples: 500,
ingestionRateLimit: 1000,

pushes: []testPush{
{samples: 1000, expectedError: nil},
},

metricNames: []string{"cortex_distributor_ingestion_rate_samples_per_second_limit", "cortex_distributor_ingestion_rate_samples_per_second"},
expectedMetrics: `
# HELP cortex_distributor_ingestion_rate_samples_per_second Current ingestion rate in samples/sec that distributor is using to limit access.
# TYPE cortex_distributor_ingestion_rate_samples_per_second gauge
cortex_distributor_ingestion_rate_samples_per_second 600

# HELP cortex_distributor_ingestion_rate_samples_per_second_limit Limit for Distributor's ingestion rate.
# TYPE cortex_distributor_ingestion_rate_samples_per_second_limit gauge
cortex_distributor_ingestion_rate_samples_per_second_limit 1000
`,
},
"hits rate limit on first request, but second request can proceed": {
preRateSamples: 1200,
ingestionRateLimit: 1000,

pushes: []testPush{
{samples: 100, expectedError: errMaxSamplesPushRateLimitReached},
{samples: 100, expectedError: nil},
},
},

"below rate limit on first request, but hits the rate limit afterwards": {
preRateSamples: 500,
ingestionRateLimit: 1000,

pushes: []testPush{
{samples: 5000, expectedError: nil}, // after push, rate = 500 + 0.2*(5000-500) = 1400
{samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1400 + 0.2*(0 - 1400) = 1120
{samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1120 + 0.2*(0 - 1120) = 896
{samples: 5000, expectedError: nil}, // 896 is below 1000, so this push succeeds, new rate = 896 + 0.2*(5000-896) = 1716.8
},
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

// Start all expected distributors
distributors, _, r, regs := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
shardByAllLabels: true,
limits: limits,
maxInflightRequests: testData.inflightLimit,
maxIngestionRate: testData.ingestionRateLimit,
})
defer stopAll(distributors, r)

d := distributors[0]
d.inflightPushRequests.Add(int64(testData.preInflight))
d.ingestionRate.Add(int64(testData.preRateSamples))

d.ingestionRate.Tick()

for _, push := range testData.pushes {
request := makeWriteRequest(0, push.samples, push.metadata)
_, err := d.Push(ctx, request)

if push.expectedError == nil {
assert.Nil(t, err)
} else {
assert.Equal(t, push.expectedError, err)
}

d.ingestionRate.Tick()

if testData.expectedMetrics != "" {
assert.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(testData.expectedMetrics), testData.metricNames...))
}
}
})
}
}

func TestDistributor_PushHAInstances(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "user")

Expand Down Expand Up @@ -1478,6 +1628,8 @@ type prepConfig struct {
limits *validation.Limits
numDistributors int
skipLabelNameValidation bool
maxInflightRequests int
maxIngestionRate float64
}

func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *ring.Ring, []*prometheus.Registry) {
Expand Down Expand Up @@ -1558,6 +1710,8 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
distributorCfg.DistributorRing.KVStore.Mock = kvStore
distributorCfg.DistributorRing.InstanceAddr = "127.0.0.1"
distributorCfg.SkipLabelNameValidation = cfg.skipLabelNameValidation
distributorCfg.MaxInflightPushRequests = cfg.maxInflightRequests
distributorCfg.MaxIngestionRate = cfg.maxIngestionRate

if cfg.shuffleShardEnabled {
distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle
Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (

// Jitter applied to the idle timeout to prevent compaction in all ingesters concurrently.
compactionIdleTimeoutJitter = 0.25

instanceIngestionRateTickInterval = time.Second
)

// Shipper interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -474,7 +476,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
wal: &noopWAL{},
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
ingestionRate: util_math.NewEWMARate(0.2, cfg.RateUpdatePeriod),
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
}
i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests)

Expand Down Expand Up @@ -633,7 +635,7 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
rateUpdateTicker := time.NewTicker(i.cfg.RateUpdatePeriod)
defer rateUpdateTicker.Stop()

ingestionRateTicker := time.NewTicker(1 * time.Second)
ingestionRateTicker := time.NewTicker(instanceIngestionRateTickInterval)
defer ingestionRateTicker.Stop()

var activeSeriesTickerChan <-chan time.Time
Expand Down
Loading