Skip to content

Commit 8a2e2c1

Browse files
authored
Optimize distributor push on error (#3990)
* Added BenchmarkDistributor_PushOnError Signed-off-by: Marco Pracucci <[email protected]> * Optimized distributor Push() on validation error Signed-off-by: Marco Pracucci <[email protected]> * Added CHANGELOG entry Signed-off-by: Marco Pracucci <[email protected]> * Fix linter error Signed-off-by: Marco Pracucci <[email protected]> * Simplified ValidationError Signed-off-by: Marco Pracucci <[email protected]>
1 parent c13194e commit 8a2e2c1

File tree

6 files changed

+480
-85
lines changed

6 files changed

+480
-85
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
1919
* [ENHANCEMENT] Query-frontend: reduced memory allocations when serializing query response. #3964
2020
* [ENHANCEMENT] Ingester: reduce CPU and memory when an high number of errors are returned by the ingester on the write path with the blocks storage. #3969 #3971 #3973
21+
* [ENHANCEMENT] Distributor: reduce CPU and memory when an high number of errors are returned by the distributor on the write path. #3990
2122
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
2223
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
2324
* [BUGFIX] Querier: streamline tracing spans. #3924

pkg/distributor/distributor.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
410410
// Validates a single series from a write request. Will remove labels if
411411
// any are configured to be dropped for the user ID.
412412
// Returns the validated series with it's labels/samples, and any error.
413-
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool) (cortexpb.PreallocTimeseries, error) {
413+
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool) (cortexpb.PreallocTimeseries, validation.ValidationError) {
414414
d.labelsHistogram.Observe(float64(len(ts.Labels)))
415415
if err := validation.ValidateLabels(d.limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
416416
return emptyPreallocSeries, err
@@ -544,12 +544,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
544544
}
545545

546546
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
547-
validatedSeries, err := d.validateSeries(ts, userID, skipLabelNameValidation)
547+
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation)
548548

549549
// Errors in validation are considered non-fatal, as one series in a request may contain
550550
// invalid data but all the remaining series could be perfectly valid.
551-
if err != nil && firstPartialErr == nil {
552-
firstPartialErr = err
551+
if validationErr != nil && firstPartialErr == nil {
552+
firstPartialErr = httpgrpc.Errorf(http.StatusBadRequest, validationErr.Error())
553553
}
554554

555555
// validateSeries would have returned an emptyPreallocSeries if there were no valid samples.

pkg/distributor/distributor_test.go

+255
Original file line numberDiff line numberDiff line change
@@ -969,6 +969,247 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) {
969969
}
970970
}
971971

972+
func BenchmarkDistributor_PushOnError(b *testing.B) {
973+
const (
974+
numSeriesPerRequest = 1000
975+
)
976+
977+
tests := map[string]struct {
978+
prepareConfig func(limits *validation.Limits)
979+
prepareSeries func() ([]labels.Labels, []cortexpb.Sample)
980+
expectedErr string
981+
}{
982+
"ingestion rate limit reached": {
983+
prepareConfig: func(limits *validation.Limits) {
984+
limits.IngestionRate = 1
985+
limits.IngestionBurstSize = 1
986+
},
987+
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
988+
metrics := make([]labels.Labels, numSeriesPerRequest)
989+
samples := make([]cortexpb.Sample, numSeriesPerRequest)
990+
991+
for i := 0; i < numSeriesPerRequest; i++ {
992+
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
993+
for i := 0; i < 10; i++ {
994+
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
995+
}
996+
997+
metrics[i] = lbls.Labels()
998+
samples[i] = cortexpb.Sample{
999+
Value: float64(i),
1000+
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
1001+
}
1002+
}
1003+
1004+
return metrics, samples
1005+
},
1006+
expectedErr: "ingestion rate limit",
1007+
},
1008+
"too many labels limit reached": {
1009+
prepareConfig: func(limits *validation.Limits) {
1010+
limits.MaxLabelNamesPerSeries = 30
1011+
},
1012+
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
1013+
metrics := make([]labels.Labels, numSeriesPerRequest)
1014+
samples := make([]cortexpb.Sample, numSeriesPerRequest)
1015+
1016+
for i := 0; i < numSeriesPerRequest; i++ {
1017+
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
1018+
for i := 1; i < 31; i++ {
1019+
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
1020+
}
1021+
1022+
metrics[i] = lbls.Labels()
1023+
samples[i] = cortexpb.Sample{
1024+
Value: float64(i),
1025+
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
1026+
}
1027+
}
1028+
1029+
return metrics, samples
1030+
},
1031+
expectedErr: "series has too many labels",
1032+
},
1033+
"max label name length limit reached": {
1034+
prepareConfig: func(limits *validation.Limits) {
1035+
limits.MaxLabelNameLength = 1024
1036+
},
1037+
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
1038+
metrics := make([]labels.Labels, numSeriesPerRequest)
1039+
samples := make([]cortexpb.Sample, numSeriesPerRequest)
1040+
1041+
for i := 0; i < numSeriesPerRequest; i++ {
1042+
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
1043+
for i := 0; i < 10; i++ {
1044+
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
1045+
}
1046+
1047+
// Add a label with a very long name.
1048+
lbls.Set(fmt.Sprintf("xxx_%0.2000d", 1), "xxx")
1049+
1050+
metrics[i] = lbls.Labels()
1051+
samples[i] = cortexpb.Sample{
1052+
Value: float64(i),
1053+
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
1054+
}
1055+
}
1056+
1057+
return metrics, samples
1058+
},
1059+
expectedErr: "label name too long",
1060+
},
1061+
"max label value length limit reached": {
1062+
prepareConfig: func(limits *validation.Limits) {
1063+
limits.MaxLabelValueLength = 1024
1064+
},
1065+
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
1066+
metrics := make([]labels.Labels, numSeriesPerRequest)
1067+
samples := make([]cortexpb.Sample, numSeriesPerRequest)
1068+
1069+
for i := 0; i < numSeriesPerRequest; i++ {
1070+
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
1071+
for i := 0; i < 10; i++ {
1072+
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
1073+
}
1074+
1075+
// Add a label with a very long value.
1076+
lbls.Set("xxx", fmt.Sprintf("xxx_%0.2000d", 1))
1077+
1078+
metrics[i] = lbls.Labels()
1079+
samples[i] = cortexpb.Sample{
1080+
Value: float64(i),
1081+
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
1082+
}
1083+
}
1084+
1085+
return metrics, samples
1086+
},
1087+
expectedErr: "label value too long",
1088+
},
1089+
"timestamp too old": {
1090+
prepareConfig: func(limits *validation.Limits) {
1091+
limits.RejectOldSamples = true
1092+
limits.RejectOldSamplesMaxAge = time.Hour
1093+
},
1094+
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
1095+
metrics := make([]labels.Labels, numSeriesPerRequest)
1096+
samples := make([]cortexpb.Sample, numSeriesPerRequest)
1097+
1098+
for i := 0; i < numSeriesPerRequest; i++ {
1099+
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
1100+
for i := 0; i < 10; i++ {
1101+
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
1102+
}
1103+
1104+
metrics[i] = lbls.Labels()
1105+
samples[i] = cortexpb.Sample{
1106+
Value: float64(i),
1107+
TimestampMs: time.Now().Add(-2*time.Hour).UnixNano() / int64(time.Millisecond),
1108+
}
1109+
}
1110+
1111+
return metrics, samples
1112+
},
1113+
expectedErr: "timestamp too old",
1114+
},
1115+
"timestamp too new": {
1116+
prepareConfig: func(limits *validation.Limits) {
1117+
limits.CreationGracePeriod = time.Minute
1118+
},
1119+
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
1120+
metrics := make([]labels.Labels, numSeriesPerRequest)
1121+
samples := make([]cortexpb.Sample, numSeriesPerRequest)
1122+
1123+
for i := 0; i < numSeriesPerRequest; i++ {
1124+
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
1125+
for i := 0; i < 10; i++ {
1126+
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
1127+
}
1128+
1129+
metrics[i] = lbls.Labels()
1130+
samples[i] = cortexpb.Sample{
1131+
Value: float64(i),
1132+
TimestampMs: time.Now().Add(time.Hour).UnixNano() / int64(time.Millisecond),
1133+
}
1134+
}
1135+
1136+
return metrics, samples
1137+
},
1138+
expectedErr: "timestamp too new",
1139+
},
1140+
}
1141+
1142+
for testName, testData := range tests {
1143+
b.Run(testName, func(b *testing.B) {
1144+
// Create an in-memory KV store for the ring with 1 ingester registered.
1145+
kvStore := consul.NewInMemoryClient(ring.GetCodec())
1146+
err := kvStore.CAS(context.Background(), ring.IngesterRingKey,
1147+
func(_ interface{}) (interface{}, bool, error) {
1148+
d := &ring.Desc{}
1149+
d.AddIngester("ingester-1", "127.0.0.1", "", ring.GenerateTokens(128, nil), ring.ACTIVE, time.Now())
1150+
return d, true, nil
1151+
},
1152+
)
1153+
require.NoError(b, err)
1154+
1155+
ingestersRing, err := ring.New(ring.Config{
1156+
KVStore: kv.Config{Mock: kvStore},
1157+
HeartbeatTimeout: 60 * time.Minute,
1158+
ReplicationFactor: 1,
1159+
}, ring.IngesterRingKey, ring.IngesterRingKey, nil)
1160+
require.NoError(b, err)
1161+
require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingestersRing))
1162+
b.Cleanup(func() {
1163+
require.NoError(b, services.StopAndAwaitTerminated(context.Background(), ingestersRing))
1164+
})
1165+
1166+
test.Poll(b, time.Second, 1, func() interface{} {
1167+
return ingestersRing.InstancesCount()
1168+
})
1169+
1170+
// Prepare the distributor configuration.
1171+
var distributorCfg Config
1172+
var clientConfig client.Config
1173+
limits := validation.Limits{}
1174+
flagext.DefaultValues(&distributorCfg, &clientConfig, &limits)
1175+
1176+
limits.IngestionRate = 0 // Unlimited.
1177+
testData.prepareConfig(&limits)
1178+
1179+
distributorCfg.ShardByAllLabels = true
1180+
distributorCfg.IngesterClientFactory = func(addr string) (ring_client.PoolClient, error) {
1181+
return &noopIngester{}, nil
1182+
}
1183+
1184+
overrides, err := validation.NewOverrides(limits, nil)
1185+
require.NoError(b, err)
1186+
1187+
// Start the distributor.
1188+
distributor, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true, nil, log.NewNopLogger())
1189+
require.NoError(b, err)
1190+
require.NoError(b, services.StartAndAwaitRunning(context.Background(), distributor))
1191+
1192+
b.Cleanup(func() {
1193+
require.NoError(b, services.StopAndAwaitTerminated(context.Background(), distributor))
1194+
})
1195+
1196+
// Prepare the series to remote write before starting the benchmark.
1197+
metrics, samples := testData.prepareSeries()
1198+
1199+
// Run the benchmark.
1200+
b.ReportAllocs()
1201+
b.ResetTimer()
1202+
1203+
for n := 0; n < b.N; n++ {
1204+
_, err := distributor.Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API))
1205+
if err == nil || !strings.Contains(err.Error(), testData.expectedErr) {
1206+
b.Fatalf("expected %v error but got %v", testData.expectedErr, err)
1207+
}
1208+
}
1209+
})
1210+
}
1211+
}
1212+
9721213
func TestSlowQueries(t *testing.T) {
9731214
nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo")
9741215
nIngesters := 3
@@ -1661,6 +1902,20 @@ func (i *mockIngester) countCalls(name string) int {
16611902
return i.calls[name]
16621903
}
16631904

1905+
// noopIngester is a mocked ingester which does nothing.
1906+
type noopIngester struct {
1907+
client.IngesterClient
1908+
grpc_health_v1.HealthClient
1909+
}
1910+
1911+
func (i *noopIngester) Close() error {
1912+
return nil
1913+
}
1914+
1915+
func (i *noopIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
1916+
return nil, nil
1917+
}
1918+
16641919
type stream struct {
16651920
grpc.ClientStream
16661921
i int

0 commit comments

Comments
 (0)