Skip to content

WIP: Support ingesting exemplars into TSDB when blocks storage is enabled #4104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
473 changes: 429 additions & 44 deletions pkg/cortexpb/cortex.pb.go

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion pkg/cortexpb/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ message WriteResponse {}
message TimeSeries {
repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"];
// Sorted by time, oldest sample first.
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if we need to sort the exemplars by timestamp as well (see the comment above the samples line)

}

message LabelPair {
Expand Down Expand Up @@ -60,3 +61,10 @@ message MetricMetadata {
message Metric {
repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"];
}

message Exemplar {
// Exemplar labels, different than series labels
repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"];
double value = 2;
int64 timestamp_ms = 3;
}
13 changes: 8 additions & 5 deletions pkg/cortexpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
)

var (
expectedTimeseries = 100
expectedLabels = 20
expectedSamplesPerSeries = 10
expectedTimeseries = 100
expectedLabels = 20
expectedSamplesPerSeries = 10
expectedExemplarsPerSeries = 1

/*
We cannot pool these as pointer-to-slice because the place we use them is in WriteRequest which is generated from Protobuf
Expand All @@ -30,8 +31,9 @@ var (
timeSeriesPool = sync.Pool{
New: func() interface{} {
return &TimeSeries{
Labels: make([]LabelAdapter, 0, expectedLabels),
Samples: make([]Sample, 0, expectedSamplesPerSeries),
Labels: make([]LabelAdapter, 0, expectedLabels),
Samples: make([]Sample, 0, expectedSamplesPerSeries),
Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries),
}
},
}
Expand Down Expand Up @@ -295,5 +297,6 @@ func ReuseTimeseries(ts *TimeSeries) {
}
ts.Labels = ts.Labels[:0]
ts.Samples = ts.Samples[:0]
ts.Exemplars = ts.Exemplars[:0]
timeSeriesPool.Put(ts)
}
40 changes: 37 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ type Distributor struct {
// Metrics
queryDuration *instrument.HistogramCollector
receivedSamples *prometheus.CounterVec
receivedExemplars *prometheus.CounterVec
receivedMetadata *prometheus.CounterVec
incomingSamples *prometheus.CounterVec
incomingExemplars *prometheus.CounterVec
incomingMetadata *prometheus.CounterVec
nonHASamples *prometheus.CounterVec
dedupedSamples *prometheus.CounterVec
Expand Down Expand Up @@ -241,6 +243,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user"}),
receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_received_exemplars_total",
Help: "The total number of received exemplars, excluding rejected and deduped exemplars.",
}, []string{"user"}),
receivedMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_received_metadata_total",
Expand All @@ -251,6 +258,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "distributor_samples_in_total",
Help: "The total number of samples that have come in to the distributor, including rejected or deduped samples.",
}, []string{"user"}),
incomingExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_exemplars_in_total",
Help: "The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.",
}, []string{"user"}),
incomingMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_metadata_in_total",
Expand Down Expand Up @@ -375,8 +387,10 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
d.HATracker.cleanupHATrackerMetricsForUser(userID)

d.receivedSamples.DeleteLabelValues(userID)
d.receivedExemplars.DeleteLabelValues(userID)
d.receivedMetadata.DeleteLabelValues(userID)
d.incomingSamples.DeleteLabelValues(userID)
d.incomingExemplars.DeleteLabelValues(userID)
d.incomingMetadata.DeleteLabelValues(userID)
d.nonHASamples.DeleteLabelValues(userID)
d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)
Expand Down Expand Up @@ -491,10 +505,23 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
samples = append(samples, s)
}

// Don't alloc a new empty slice unnecessarily
exemplars := ts.Exemplars
if len(ts.Exemplars) > 0 {
exemplars = make([]cortexpb.Exemplar, 0, len(ts.Exemplars))
for _, e := range ts.Exemplars {
if err := validation.ValidateExemplar(userID, ts.Labels, e); err != nil {
return emptyPreallocSeries, err
}
exemplars = append(exemplars, e)
}
}

return cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: ts.Labels,
Samples: samples,
Labels: ts.Labels,
Samples: samples,
Exemplars: exemplars,
},
},
nil
Expand Down Expand Up @@ -530,11 +557,14 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
removeReplica := false

numSamples := 0
numExemplars := 0
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples)
numExemplars += len(ts.Exemplars)
}
// Count the total samples in, prior to validation or deduplication, for comparison with other metrics.
d.incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
// Count the total number of metadata in.
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))

Expand All @@ -546,6 +576,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
metadataKeys := make([]uint32, 0, len(req.Metadata))
seriesKeys := make([]uint32, 0, len(req.Timeseries))
validatedSamples := 0
validatedExemplars := 0

if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
Expand Down Expand Up @@ -642,6 +673,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
seriesKeys = append(seriesKeys, key)
validatedTimeseries = append(validatedTimeseries, validatedSeries)
validatedSamples += len(ts.Samples)
validatedExemplars += len(ts.Exemplars)
}

for _, m := range req.Metadata {
Expand All @@ -660,6 +692,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

d.receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
d.receivedExemplars.WithLabelValues(userID).Add((float64(validatedExemplars)))
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
Expand All @@ -669,14 +702,15 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return &cortexpb.WriteResponse{}, firstPartialErr
}

totalN := validatedSamples + len(validatedMetadata)
totalN := validatedSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)

// Return a 4xx here to have the client discard the data and not retry. If a client
// is sending too much data consistently we will unlikely ever catch up otherwise.
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
}
Expand Down
91 changes: 89 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,19 +273,24 @@ func TestDistributor_MetricsCleanup(t *testing.T) {

metrics := []string{
"cortex_distributor_received_samples_total",
"cortex_distributor_received_exemplars_total",
"cortex_distributor_received_metadata_total",
"cortex_distributor_deduped_samples_total",
"cortex_distributor_samples_in_total",
"cortex_distributor_exemplars_in_total",
"cortex_distributor_metadata_in_total",
"cortex_distributor_non_ha_samples_received_total",
"cortex_distributor_latest_seen_sample_timestamp_seconds",
}

d.receivedSamples.WithLabelValues("userA").Add(5)
d.receivedSamples.WithLabelValues("userB").Add(10)
d.receivedExemplars.WithLabelValues("userA").Add(5)
d.receivedExemplars.WithLabelValues("userB").Add(10)
d.receivedMetadata.WithLabelValues("userA").Add(5)
d.receivedMetadata.WithLabelValues("userB").Add(10)
d.incomingSamples.WithLabelValues("userA").Add(5)
d.incomingExemplars.WithLabelValues("userA").Add(5)
d.incomingMetadata.WithLabelValues("userA").Add(5)
d.nonHASamples.WithLabelValues("userA").Add(5)
d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric
Expand Down Expand Up @@ -318,10 +323,19 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
cortex_distributor_received_samples_total{user="userA"} 5
cortex_distributor_received_samples_total{user="userB"} 10

# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
# TYPE cortex_distributor_received_exemplars_total counter
cortex_distributor_received_exemplars_total{user="userA"} 5
cortex_distributor_received_exemplars_total{user="userB"} 10

# HELP cortex_distributor_samples_in_total The total number of samples that have come in to the distributor, including rejected or deduped samples.
# TYPE cortex_distributor_samples_in_total counter
cortex_distributor_samples_in_total{user="userA"} 5
`), metrics...))

# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
# TYPE cortex_distributor_exemplars_in_total counter
cortex_distributor_exemplars_in_total{user="userA"} 5
`), metrics...))

d.cleanupInactiveUser("userA")

Expand All @@ -346,9 +360,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{user="userB"} 10

# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
# TYPE cortex_distributor_received_exemplars_total counter
cortex_distributor_received_exemplars_total{user="userB"} 10

# HELP cortex_distributor_samples_in_total The total number of samples that have come in to the distributor, including rejected or deduped samples.
# TYPE cortex_distributor_samples_in_total counter
`), metrics...))

# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
# TYPE cortex_distributor_exemplars_in_total counter
`), metrics...))
}

func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
Expand Down Expand Up @@ -1118,6 +1139,72 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) {
}
}

func TestDistributor_Push_ExemplarValidation(t *testing.T) {
tests := map[string]struct {
input cortexpb.Exemplar
errExpected bool
errMessage string
}{
"valid exemplar": {
input: cortexpb.Exemplar{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
TimestampMs: 1000,
},
errExpected: false,
},
"rejects exemplar with no labels": {
input: cortexpb.Exemplar{},
errExpected: true,
errMessage: `exemplar missing labels: series: {__name__="test"}`,
},
"rejects exemplar with timestamp": {
input: cortexpb.Exemplar{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}}},
errExpected: true,
errMessage: `exemplar missing timestamp: series: {__name__="test"} labels: {foo="bar"}`,
},
"rejects exemplar with too long labelset": {
input: cortexpb.Exemplar{
TimestampMs: 1000,
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: strings.Repeat("0", 126)}}},
errExpected: true,
errMessage: fmt.Sprintf(`exemplar combined labelset too long: series: {__name__="test"} labels: {foo="%s"}`, strings.Repeat("0", 126)),
},
}

for testName, tc := range tests {
t.Run(testName, func(t *testing.T) {
ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shuffleShardSize: 1,
})

req := &cortexpb.WriteRequest{
Timeseries: []cortexpb.PreallocTimeseries{
{
TimeSeries: &cortexpb.TimeSeries{
Labels: []cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
Exemplars: []cortexpb.Exemplar{
tc.input,
},
},
},
},
}

_, err := ds[0].Push(ctx, req)
if tc.errExpected {
fromError, _ := status.FromError(err)
assert.Equal(t, tc.errMessage, fromError.Message())
} else {
assert.Nil(t, err)
}
})
}
}

func BenchmarkDistributor_Push(b *testing.B) {
const (
numSeriesPerRequest = 1000
Expand Down
30 changes: 30 additions & 0 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -751,6 +752,8 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
var (
succeededSamplesCount = 0
failedSamplesCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
Expand Down Expand Up @@ -847,6 +850,30 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
return copiedLabels
})
}

// app.AppendExemplar currently doesn't create the series, it must
// already exist. If it does not then drop. TODO(mdisibio) - better way to handle?
if ref == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In prometheus we skip the exemplar and increment a counter, probably best to just do the same here. Because right now a TimeSeries only contains a sample OR an exdemplar, continuing here is valid imo. If we included samples and exemplars in the same TimeSeries I think the only case where we could reach this line (post appending the sample) and still not have a valid reference ID is if appending the sample itself failed, which afaict only happens in the event of an invalid labelset.

Copy link
Contributor Author

@mdisibio mdisibio Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is true. We have two choices, we could count them as failed or discarded due to validation. In this case I think failed is more appropriate because there isn't anything wrong with the exemplar itself, it is more that it couldn't be ingested due to a limitation in the current tsdb implementation. If/when tsdb AppendExemplar is updated to create the series then the same data would be ingested successfully.

Update: Went with failed approach.

continue
}

for _, ex := range ts.Exemplars {
e := exemplar.Exemplar{
Value: ex.Value,
Ts: ex.TimestampMs,
HasTs: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we're ingesting via remote write, as long as all exemplars have the same value for this field (whether it's true or false) we would be able to dedupe properly

Labels: cortexpb.FromLabelAdaptersToLabelsWithCopy(ex.Labels),
}

if _, err = app.AppendExemplar(ref, nil, e); err == nil {
succeededExemplarsCount++
continue
}

// Error adding exemplar
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(ex.TimestampMs), ts.Labels) })
failedExemplarsCount++
}
}

// At this point all samples have been added to the appender, so we can track the time it took.
Expand All @@ -868,6 +895,8 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
// which will be converted into an HTTP 5xx and the client should/will retry.
i.metrics.ingestedSamples.Add(float64(succeededSamplesCount))
i.metrics.ingestedSamplesFail.Add(float64(failedSamplesCount))
i.metrics.ingestedExemplars.Add(float64(succeededExemplarsCount))
i.metrics.ingestedExemplarsFail.Add(float64(failedExemplarsCount))

if sampleOutOfBoundsCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Add(float64(sampleOutOfBoundsCount))
Expand Down Expand Up @@ -1479,6 +1508,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
WALSegmentSize: i.cfg.BlocksStorageConfig.TSDB.WALSegmentSizeBytes,
SeriesLifecycleCallback: userDB,
BlocksToDelete: userDB.blocksToDelete,
MaxExemplars: i.cfg.BlocksStorageConfig.TSDB.MaxExemplars,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Expand Down
Loading