Skip to content

Optimized metric name extraction in distributor #4001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,11 @@ func (d *Distributor) tokenForLabels(userID string, labels []cortexpb.LabelAdapt
return shardByAllLabels(userID, labels), nil
}

metricName, err := extract.MetricNameFromLabelAdapters(labels)
unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(labels)
if err != nil {
return 0, err
}
return shardByMetricName(userID, metricName), nil
return shardByMetricName(userID, unsafeMetricName), nil
}

func (d *Distributor) tokenForMetadata(userID string, metricName string) uint32 {
Expand All @@ -349,6 +349,8 @@ func (d *Distributor) tokenForMetadata(userID string, metricName string) uint32
return shardByUser(userID)
}

// shardByMetricName returns the token for the given metric. The provided metricName
// is guaranteed to not be retained.
func shardByMetricName(userID string, metricName string) uint32 {
h := shardByUser(userID)
h = ingester_client.HashAdd32(h, metricName)
Expand Down Expand Up @@ -410,16 +412,16 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
// Validates a single series from a write request. Will remove labels if
// any are configured to be dropped for the user ID.
// Returns the validated series with it's labels/samples, and any error.
// The returned error may retain the series labels.
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool) (cortexpb.PreallocTimeseries, validation.ValidationError) {
d.labelsHistogram.Observe(float64(len(ts.Labels)))
if err := validation.ValidateLabels(d.limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
return emptyPreallocSeries, err
}

metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels)
samples := make([]cortexpb.Sample, 0, len(ts.Samples))
for _, s := range ts.Samples {
if err := validation.ValidateSample(d.limits, userID, metricName, s); err != nil {
if err := validation.ValidateSample(d.limits, userID, ts.Labels, s); err != nil {
return emptyPreallocSeries, err
}
samples = append(samples, s)
Expand Down Expand Up @@ -549,6 +551,8 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
if validationErr != nil && firstPartialErr == nil {
// The series labels may be retained by validationErr but that's not a problem for this
// use case because we format it calling Error() and then we discard it.
firstPartialErr = httpgrpc.Errorf(http.StatusBadRequest, validationErr.Error())
}

Expand Down
31 changes: 29 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) {
}
}

func BenchmarkDistributor_PushOnError(b *testing.B) {
func BenchmarkDistributor_Push(b *testing.B) {
const (
numSeriesPerRequest = 1000
)
Expand All @@ -979,6 +979,29 @@ func BenchmarkDistributor_PushOnError(b *testing.B) {
prepareSeries func() ([]labels.Labels, []cortexpb.Sample)
expectedErr string
}{
"all samples successfully pushed": {
prepareConfig: func(limits *validation.Limits) {},
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpb.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpb.Sample{
Value: float64(i),
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "",
},
"ingestion rate limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.IngestionRate = 1
Expand Down Expand Up @@ -1202,7 +1225,11 @@ func BenchmarkDistributor_PushOnError(b *testing.B) {

for n := 0; n < b.N; n++ {
_, err := distributor.Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API))
if err == nil || !strings.Contains(err.Error(), testData.expectedErr) {

if testData.expectedErr == "" && err != nil {
b.Fatalf("no error expected but got %v", err)
}
if testData.expectedErr != "" && (err == nil || !strings.Contains(err.Error(), testData.expectedErr)) {
b.Fatalf("expected %v error but got %v", testData.expectedErr, err)
}
}
Expand Down
18 changes: 15 additions & 3 deletions pkg/util/extract/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,24 @@ var (
)

// MetricNameFromLabelAdapters extracts the metric name from a list of LabelPairs.
// The returned metric name string is a copy of the label value.
func MetricNameFromLabelAdapters(labels []cortexpb.LabelAdapter) (string, error) {
unsafeMetricName, err := UnsafeMetricNameFromLabelAdapters(labels)
if err != nil {
return "", err
}

// Force a string copy since LabelAdapter is often a pointer into
// a large gRPC buffer which we don't want to keep alive on the heap.
return string([]byte(unsafeMetricName)), nil
}

// UnsafeMetricNameFromLabelAdapters extracts the metric name from a list of LabelPairs.
// The returned metric name string is a reference to the label value (no copy).
func UnsafeMetricNameFromLabelAdapters(labels []cortexpb.LabelAdapter) (string, error) {
for _, label := range labels {
if label.Name == model.MetricNameLabel {
// Force a string copy since LabelAdapter is often a pointer into
// a large gRPC buffer which we don't want to keep alive on the heap.
return string([]byte(label.Value)), nil
return label.Value, nil
}
}
return "", errNoMetricNameLabel
Expand Down
16 changes: 10 additions & 6 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,18 @@ type SampleValidationConfig interface {
}

// ValidateSample returns an err if the sample is invalid.
func ValidateSample(cfg SampleValidationConfig, userID string, metricName string, s cortexpb.Sample) ValidationError {
// The returned error may retain the provided series labels.
func ValidateSample(cfg SampleValidationConfig, userID string, ls []cortexpb.LabelAdapter, s cortexpb.Sample) ValidationError {
unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls)

if cfg.RejectOldSamples(userID) && model.Time(s.TimestampMs) < model.Now().Add(-cfg.RejectOldSamplesMaxAge(userID)) {
DiscardedSamples.WithLabelValues(greaterThanMaxSampleAge, userID).Inc()
return newSampleTimestampTooOldError(metricName, s.TimestampMs)
return newSampleTimestampTooOldError(unsafeMetricName, s.TimestampMs)
}

if model.Time(s.TimestampMs) > model.Now().Add(cfg.CreationGracePeriod(userID)) {
DiscardedSamples.WithLabelValues(tooFarInFuture, userID).Inc()
return newSampleTimestampTooNewError(metricName, s.TimestampMs)
return newSampleTimestampTooNewError(unsafeMetricName, s.TimestampMs)
}

return nil
Expand All @@ -106,17 +109,18 @@ type LabelValidationConfig interface {
}

// ValidateLabels returns an err if the labels are invalid.
// The returned error may retain the provided series labels.
func ValidateLabels(cfg LabelValidationConfig, userID string, ls []cortexpb.LabelAdapter, skipLabelNameValidation bool) ValidationError {
if cfg.EnforceMetricName(userID) {
metricName, err := extract.MetricNameFromLabelAdapters(ls)
unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(ls)
if err != nil {
DiscardedSamples.WithLabelValues(missingMetricName, userID).Inc()
return newNoMetricNameError()
}

if !model.IsValidMetricName(model.LabelValue(metricName)) {
if !model.IsValidMetricName(model.LabelValue(unsafeMetricName)) {
DiscardedSamples.WithLabelValues(invalidMetricName, userID).Inc()
return newInvalidMetricNameError(metricName)
return newInvalidMetricNameError(unsafeMetricName)
}
}

Expand Down