Skip to content

Commit 94b0137

Browse files
pstibranystevesg
andauthored
Distributor limits (#4071)
* Distributor limits. Signed-off-by: Peter Štibraný <[email protected]> * CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Update pkg/distributor/distributor.go. Fixed formatting. Co-authored-by: Steve Simpson <[email protected]> Signed-off-by: Peter Štibraný <[email protected]> * Fix docs. Signed-off-by: Peter Štibraný <[email protected]> * Address review feedback. Signed-off-by: Peter Štibraný <[email protected]> * Fix tests after metric name changes. Signed-off-by: Peter Štibraný <[email protected]> * Fix CHANGELOG.md entry. Signed-off-by: Peter Štibraný <[email protected]> Co-authored-by: Steve Simpson <[email protected]>
1 parent cf724d7 commit 94b0137

File tree

8 files changed

+268
-12
lines changed

8 files changed

+268
-12
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* [ENHANCEMENT] Add a metric `cortex_compactor_compaction_interval_seconds` for the compaction interval config value. #4040
3939
* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
4040
* [ENHANCEMENT] Cortex is now built with Go 1.16. #4062
41+
* [ENHANCEMENT] Distributor: added per-distributor limits: max number of inflight requests (`-distributor.instance-limits.max-inflight-push-requests`) and max ingestion rate in samples/sec (`-distributor.instance-limits.max-ingestion-rate`). If not set, these two are unlimited. Also added metrics to expose current values (`cortex_distributor_inflight_push_requests`, `cortex_distributor_ingestion_rate_samples_per_second`) as well as limits (`cortex_distributor_instance_limits` with various `limit` label values). #4071
4142
* [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074
4243
* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `tenant` to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `tenant` parameter. If no `tenant` is specified, all tenants are flushed, as before. #4073
4344
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013

docs/configuration/config-file-reference.md

+14
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,20 @@ ring:
575575
# Name of network interface to read address from.
576576
# CLI flag: -distributor.ring.instance-interface-names
577577
[instance_interface_names: <list of string> | default = [eth0 en0]]
578+
579+
instance_limits:
580+
# Max ingestion rate (samples/sec) that this distributor will accept. This
581+
# limit is per-distributor, not per-tenant. Additional push requests will be
582+
# rejected. Current ingestion rate is computed as exponentially weighted
583+
# moving average, updated every second. 0 = unlimited.
584+
# CLI flag: -distributor.instance-limits.max-ingestion-rate
585+
[max_ingestion_rate: <float> | default = 0]
586+
587+
# Max inflight push requests that this distributor can handle. This limit is
588+
# per-distributor, not per-tenant. Additional requests will be rejected. 0 =
589+
# unlimited.
590+
# CLI flag: -distributor.instance-limits.max-inflight-push-requests
591+
[max_inflight_push_requests: <int> | default = 0]
578592
```
579593

580594
### `ingester_config`

pkg/distributor/distributor.go

+78-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/weaveworks/common/httpgrpc"
2323
"github.com/weaveworks/common/instrument"
2424
"github.com/weaveworks/common/user"
25+
"go.uber.org/atomic"
2526

2627
"github.com/cortexproject/cortex/pkg/cortexpb"
2728
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
@@ -32,7 +33,7 @@ import (
3233
"github.com/cortexproject/cortex/pkg/util"
3334
"github.com/cortexproject/cortex/pkg/util/extract"
3435
"github.com/cortexproject/cortex/pkg/util/limiter"
35-
"github.com/cortexproject/cortex/pkg/util/math"
36+
util_math "github.com/cortexproject/cortex/pkg/util/math"
3637
"github.com/cortexproject/cortex/pkg/util/services"
3738
"github.com/cortexproject/cortex/pkg/util/validation"
3839
)
@@ -45,11 +46,17 @@ var (
4546
// Validation errors.
4647
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
4748
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
49+
50+
// Distributor instance limits errors.
51+
errTooManyInflightPushRequests = errors.New("too many inflight push requests in distributor")
52+
errMaxSamplesPushRateLimitReached = errors.New("distributor's samples push rate limit reached")
4853
)
4954

5055
const (
5156
typeSamples = "samples"
5257
typeMetadata = "metadata"
58+
59+
instanceIngestionRateTickInterval = time.Second
5360
)
5461

5562
// Distributor is a storage.SampleAppender and a client.Querier which
@@ -79,6 +86,9 @@ type Distributor struct {
7986

8087
activeUsers *util.ActiveUsersCleanupService
8188

89+
ingestionRate *util_math.EwmaRate
90+
inflightPushRequests atomic.Int64
91+
8292
// Metrics
8393
queryDuration *instrument.HistogramCollector
8494
receivedSamples *prometheus.CounterVec
@@ -123,6 +133,14 @@ type Config struct {
123133

124134
// This config is dynamically injected because defined in the querier config.
125135
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`
136+
137+
// Limits for distributor
138+
InstanceLimits InstanceLimits `yaml:"instance_limits"`
139+
}
140+
141+
type InstanceLimits struct {
142+
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
143+
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
126144
}
127145

128146
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -137,6 +155,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
137155
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
138156
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
139157
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
158+
159+
f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
160+
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
140161
}
141162

142163
// Validate config and returns error on failure
@@ -152,6 +173,12 @@ func (cfg *Config) Validate(limits validation.Limits) error {
152173
return cfg.HATrackerConfig.Validate()
153174
}
154175

176+
const (
177+
instanceLimitsMetric = "cortex_distributor_instance_limits"
178+
instanceLimitsMetricHelp = "Instance limits used by this distributor." // Must be same for all registrations.
179+
limitLabel = "limit"
180+
)
181+
155182
// New constructs a new Distributor
156183
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
157184
if cfg.IngesterClientFactory == nil {
@@ -200,6 +227,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
200227
limits: limits,
201228
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
202229
HATracker: haTracker,
230+
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
203231

204232
queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
205233
Namespace: "cortex",
@@ -273,6 +301,31 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
273301
Help: "Unix timestamp of latest received sample per user.",
274302
}, []string{"user"}),
275303
}
304+
305+
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
306+
Name: instanceLimitsMetric,
307+
Help: instanceLimitsMetricHelp,
308+
ConstLabels: map[string]string{limitLabel: "max_inflight_push_requests"},
309+
}).Set(float64(cfg.InstanceLimits.MaxInflightPushRequests))
310+
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
311+
Name: instanceLimitsMetric,
312+
Help: instanceLimitsMetricHelp,
313+
ConstLabels: map[string]string{limitLabel: "max_ingestion_rate"},
314+
}).Set(cfg.InstanceLimits.MaxIngestionRate)
315+
316+
promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
317+
Name: "cortex_distributor_inflight_push_requests",
318+
Help: "Current number of inflight push requests in distributor.",
319+
}, func() float64 {
320+
return float64(d.inflightPushRequests.Load())
321+
})
322+
promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
323+
Name: "cortex_distributor_ingestion_rate_samples_per_second",
324+
Help: "Current ingestion rate in samples/sec that distributor is using to limit access.",
325+
}, func() float64 {
326+
return d.ingestionRate.Rate()
327+
})
328+
276329
d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
277330
d.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(d.cleanupInactiveUser)
278331

@@ -294,11 +347,17 @@ func (d *Distributor) starting(ctx context.Context) error {
294347
}
295348

296349
func (d *Distributor) running(ctx context.Context) error {
350+
ingestionRateTicker := time.NewTicker(instanceIngestionRateTickInterval)
351+
defer ingestionRateTicker.Stop()
352+
297353
for {
298354
select {
299355
case <-ctx.Done():
300356
return nil
301357

358+
case <-ingestionRateTicker.C:
359+
d.ingestionRate.Tick()
360+
302361
case err := <-d.subservicesWatcher.Chan():
303362
return errors.Wrap(err, "distributor subservice failed")
304363
}
@@ -443,6 +502,20 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
443502
return nil, err
444503
}
445504

505+
// We will report *this* request in the error too.
506+
inflight := d.inflightPushRequests.Inc()
507+
defer d.inflightPushRequests.Dec()
508+
509+
if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) {
510+
return nil, errTooManyInflightPushRequests
511+
}
512+
513+
if d.cfg.InstanceLimits.MaxIngestionRate > 0 {
514+
if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate {
515+
return nil, errMaxSamplesPushRateLimitReached
516+
}
517+
}
518+
446519
now := time.Now()
447520
d.activeUsers.UpdateUserTimestamp(userID, now)
448521

@@ -508,7 +581,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
508581
for _, ts := range req.Timeseries {
509582
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
510583
if len(ts.Samples) > 0 {
511-
latestSampleTimestampMs = math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
584+
latestSampleTimestampMs = util_math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
512585
}
513586

514587
if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
@@ -603,6 +676,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
603676
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
604677
}
605678

679+
// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
680+
d.ingestionRate.Add(int64(totalN))
681+
606682
subRing := d.ingestersRing
607683

608684
// Obtain a subring if required.

pkg/distributor/distributor_test.go

+153
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,155 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
446446
}
447447
}
448448

449+
func TestDistributor_PushInstanceLimits(t *testing.T) {
450+
type testPush struct {
451+
samples int
452+
metadata int
453+
expectedError error
454+
}
455+
456+
tests := map[string]struct {
457+
preInflight int
458+
preRateSamples int // initial rate before first push
459+
pushes []testPush // rate is recomputed after each push
460+
461+
// limits
462+
inflightLimit int
463+
ingestionRateLimit float64
464+
465+
metricNames []string
466+
expectedMetrics string
467+
}{
468+
"no limits limit": {
469+
preInflight: 100,
470+
preRateSamples: 1000,
471+
472+
pushes: []testPush{
473+
{samples: 100, expectedError: nil},
474+
},
475+
476+
metricNames: []string{instanceLimitsMetric},
477+
expectedMetrics: `
478+
# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
479+
# TYPE cortex_distributor_instance_limits gauge
480+
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 0
481+
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 0
482+
`,
483+
},
484+
"below inflight limit": {
485+
preInflight: 100,
486+
inflightLimit: 101,
487+
pushes: []testPush{
488+
{samples: 100, expectedError: nil},
489+
},
490+
491+
metricNames: []string{instanceLimitsMetric, "cortex_distributor_inflight_push_requests"},
492+
expectedMetrics: `
493+
# HELP cortex_distributor_inflight_push_requests Current number of inflight push requests in distributor.
494+
# TYPE cortex_distributor_inflight_push_requests gauge
495+
cortex_distributor_inflight_push_requests 100
496+
497+
# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
498+
# TYPE cortex_distributor_instance_limits gauge
499+
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 101
500+
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 0
501+
`,
502+
},
503+
"hits inflight limit": {
504+
preInflight: 101,
505+
inflightLimit: 101,
506+
pushes: []testPush{
507+
{samples: 100, expectedError: errTooManyInflightPushRequests},
508+
},
509+
},
510+
"below ingestion rate limit": {
511+
preRateSamples: 500,
512+
ingestionRateLimit: 1000,
513+
514+
pushes: []testPush{
515+
{samples: 1000, expectedError: nil},
516+
},
517+
518+
metricNames: []string{instanceLimitsMetric, "cortex_distributor_ingestion_rate_samples_per_second"},
519+
expectedMetrics: `
520+
# HELP cortex_distributor_ingestion_rate_samples_per_second Current ingestion rate in samples/sec that distributor is using to limit access.
521+
# TYPE cortex_distributor_ingestion_rate_samples_per_second gauge
522+
cortex_distributor_ingestion_rate_samples_per_second 600
523+
524+
# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
525+
# TYPE cortex_distributor_instance_limits gauge
526+
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 0
527+
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 1000
528+
`,
529+
},
530+
"hits rate limit on first request, but second request can proceed": {
531+
preRateSamples: 1200,
532+
ingestionRateLimit: 1000,
533+
534+
pushes: []testPush{
535+
{samples: 100, expectedError: errMaxSamplesPushRateLimitReached},
536+
{samples: 100, expectedError: nil},
537+
},
538+
},
539+
540+
"below rate limit on first request, but hits the rate limit afterwards": {
541+
preRateSamples: 500,
542+
ingestionRateLimit: 1000,
543+
544+
pushes: []testPush{
545+
{samples: 5000, expectedError: nil}, // after push, rate = 500 + 0.2*(5000-500) = 1400
546+
{samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1400 + 0.2*(0 - 1400) = 1120
547+
{samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1120 + 0.2*(0 - 1120) = 896
548+
{samples: 5000, expectedError: nil}, // 896 is below 1000, so this push succeeds, new rate = 896 + 0.2*(5000-896) = 1716.8
549+
},
550+
},
551+
}
552+
553+
for testName, testData := range tests {
554+
testData := testData
555+
556+
t.Run(testName, func(t *testing.T) {
557+
limits := &validation.Limits{}
558+
flagext.DefaultValues(limits)
559+
560+
// Start all expected distributors
561+
distributors, _, r, regs := prepare(t, prepConfig{
562+
numIngesters: 3,
563+
happyIngesters: 3,
564+
numDistributors: 1,
565+
shardByAllLabels: true,
566+
limits: limits,
567+
maxInflightRequests: testData.inflightLimit,
568+
maxIngestionRate: testData.ingestionRateLimit,
569+
})
570+
defer stopAll(distributors, r)
571+
572+
d := distributors[0]
573+
d.inflightPushRequests.Add(int64(testData.preInflight))
574+
d.ingestionRate.Add(int64(testData.preRateSamples))
575+
576+
d.ingestionRate.Tick()
577+
578+
for _, push := range testData.pushes {
579+
request := makeWriteRequest(0, push.samples, push.metadata)
580+
_, err := d.Push(ctx, request)
581+
582+
if push.expectedError == nil {
583+
assert.Nil(t, err)
584+
} else {
585+
assert.Equal(t, push.expectedError, err)
586+
}
587+
588+
d.ingestionRate.Tick()
589+
590+
if testData.expectedMetrics != "" {
591+
assert.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(testData.expectedMetrics), testData.metricNames...))
592+
}
593+
}
594+
})
595+
}
596+
}
597+
449598
func TestDistributor_PushHAInstances(t *testing.T) {
450599
ctx = user.InjectOrgID(context.Background(), "user")
451600

@@ -1478,6 +1627,8 @@ type prepConfig struct {
14781627
limits *validation.Limits
14791628
numDistributors int
14801629
skipLabelNameValidation bool
1630+
maxInflightRequests int
1631+
maxIngestionRate float64
14811632
}
14821633

14831634
func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *ring.Ring, []*prometheus.Registry) {
@@ -1558,6 +1709,8 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
15581709
distributorCfg.DistributorRing.KVStore.Mock = kvStore
15591710
distributorCfg.DistributorRing.InstanceAddr = "127.0.0.1"
15601711
distributorCfg.SkipLabelNameValidation = cfg.skipLabelNameValidation
1712+
distributorCfg.InstanceLimits.MaxInflightPushRequests = cfg.maxInflightRequests
1713+
distributorCfg.InstanceLimits.MaxIngestionRate = cfg.maxIngestionRate
15611714

15621715
if cfg.shuffleShardEnabled {
15631716
distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle

0 commit comments

Comments
 (0)