Skip to content

Add convertToTimeseries for Sum, LastValue, and MinMaxSumCount #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 244 additions & 4 deletions exporters/metric/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ import (
"bytes"
"context"
"fmt"
"log"
"net/http"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/label"
apimetric "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/sdk/export/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
Expand All @@ -44,6 +47,26 @@ func (e *Exporter) ExportKindFor(*apimetric.Descriptor, aggregation.Kind) metric

// Export forwards metrics to Cortex from the SDK
func (e *Exporter) Export(_ context.Context, checkpointSet metric.CheckpointSet) error {
timeseries, err := e.ConvertToTimeSeries(checkpointSet)
if err != nil {
return err
}

message, buildMessageErr := e.buildMessage(timeseries)
if buildMessageErr != nil {
return buildMessageErr
}

request, buildRequestErr := e.buildRequest(message)
if buildRequestErr != nil {
return buildRequestErr
}

sendRequestErr := e.sendRequest(request)
if sendRequestErr != nil {
return sendRequestErr
}

return nil
}

Expand Down Expand Up @@ -86,11 +109,228 @@ func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller
return pusher, nil
}

// addHeaders adds required headers as well as all headers in Header map to a http
// request.
// ConvertToTimeSeries converts a CheckpointSet to a slice of TimeSeries pointers
func (e *Exporter) ConvertToTimeSeries(checkpointSet export.CheckpointSet) ([]*prompb.TimeSeries, error) {
var aggError error
var timeSeries []*prompb.TimeSeries

// Iterate over each record in the checkpoint set and convert to TimeSeries
aggError = checkpointSet.ForEach(e, func(record metric.Record) error {
// Convert based on aggregation type
agg := record.Aggregation()

// Check if aggregation has Sum value
if sum, ok := agg.(aggregation.Sum); ok {
tSeries, err := convertFromSum(record, sum)
if err != nil {
return err
}

timeSeries = append(timeSeries, tSeries)
}

// Check if aggregation has MinMaxSumCount value
if minMaxSumCount, ok := agg.(aggregation.MinMaxSumCount); ok {
tSeries, err := convertFromMinMaxSumCount(record, minMaxSumCount)
if err != nil {
return err
}

timeSeries = append(timeSeries, tSeries...)
} else if lastValue, ok := agg.(aggregation.LastValue); ok {
tSeries, err := convertFromLastValue(record, lastValue)
if err != nil {
return err
}

timeSeries = append(timeSeries, tSeries)
}

return nil
})

// Check if error was returned in checkpointSet.ForEach()
if aggError != nil {
return nil, aggError
}

return timeSeries, nil
}

// convertFromSum returns a single TimeSeries based on a Record with a Sum aggregation
func convertFromSum(record metric.Record, sum aggregation.Sum) (*prompb.TimeSeries, error) {
// Get Sum value
value, err := sum.Sum()
if err != nil {
return nil, err
}
// Create sample from Sum value
sample := prompb.Sample{
Value: value.CoerceToFloat64(record.Descriptor().NumberKind()),
Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64)
}

// Create labels, including metric name
name := sanitize(record.Descriptor().Name())
labels := createLabelSet(record, "__name__", name)

// Create TimeSeries and return
tSeries := &prompb.TimeSeries{
Samples: []prompb.Sample{sample},
Labels: labels,
}

return tSeries, nil
}

// convertFromLastValue returns a single TimeSeries based on a Record with a LastValue aggregation
func convertFromLastValue(record metric.Record, lastValue aggregation.LastValue) (*prompb.TimeSeries, error) {
// Get value
value, _, err := lastValue.LastValue()
if err != nil {
return nil, err
}

// Create sample from Last value
sample := prompb.Sample{
Value: value.CoerceToFloat64(record.Descriptor().NumberKind()),
Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64)
}

// Create labels, including metric name
name := sanitize(record.Descriptor().Name())
labels := createLabelSet(record, "__name__", name)

// Create TimeSeries and return
tSeries := &prompb.TimeSeries{
Samples: []prompb.Sample{sample},
Labels: labels,
}

return tSeries, nil
}

// convertFromMinMaxSumCount returns 4 TimeSeries for the min, max, sum, and count from the mmsc aggregation
func convertFromMinMaxSumCount(record metric.Record, minMaxSumCount aggregation.MinMaxSumCount) ([]*prompb.TimeSeries, error) {
// Convert Min
min, err := minMaxSumCount.Min()
if err != nil {
return nil, err
}
minSample := prompb.Sample{
Value: min.CoerceToFloat64(record.Descriptor().NumberKind()),
Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64)
}

// Create labels, including metric name
name := sanitize(record.Descriptor().Name() + "_min")
labels := createLabelSet(record, "__name__", name)

// Create TimeSeries
minTimeSeries := &prompb.TimeSeries{
Samples: []prompb.Sample{minSample},
Labels: labels,
}

// Convert Max
max, err := minMaxSumCount.Max()
if err != nil {
return nil, err
}
maxSample := prompb.Sample{
Value: max.CoerceToFloat64(record.Descriptor().NumberKind()),
Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64)
}

// Create labels, including metric name
name = sanitize(record.Descriptor().Name() + "_max")
labels = createLabelSet(record, "__name__", name)

// Create TimeSeries
maxTimeSeries := &prompb.TimeSeries{
Samples: []prompb.Sample{maxSample},
Labels: labels,
}

// Convert Count
count, err := minMaxSumCount.Count()
if err != nil {
return nil, err
}
countSample := prompb.Sample{
Value: float64(count),
Timestamp: record.EndTime().Unix(), // Convert time to Unix (int64)
}

// Create labels, including metric name
name = sanitize(record.Descriptor().Name() + "_count")
labels = createLabelSet(record, "__name__", name)

// Create TimeSeries
countTimeSeries := &prompb.TimeSeries{
Samples: []prompb.Sample{countSample},
Labels: labels,
}

tSeries := []*prompb.TimeSeries{
minTimeSeries, maxTimeSeries, countTimeSeries,
}

return tSeries, nil
}

// createLabelSet combines labels from a Record, resource, and extra labels to
// create a slice of prompb.Label
func createLabelSet(record metric.Record, extras ...string) []*prompb.Label {
// Map ensure no duplicate label names
labelMap := map[string]prompb.Label{}

// mergeLabels merges Record and Resource labels into a single set, giving
// precedence to the record's labels.
mi := label.NewMergeIterator(record.Labels(), record.Resource().LabelSet())
for mi.Next() {
label := mi.Label()
key := string(label.Key)
labelMap[key] = prompb.Label{
Name: sanitize(key),
Value: label.Value.Emit(),
}
}

// Add extra labels created by the exporter like the metric name
// or labels to represent histogram buckets
for i := 0; i < len(extras); i += 2 {
// Ensure even number of extras (key : value)
if i+1 >= len(extras) {
break
}

// Ensure label doesn't exist. If it does, notify user that a user created label
// is being overwritten by a Prometheus reserved label (e.g. 'le' for histograms)
_, found := labelMap[extras[i]]
if found {
log.Printf("Label %s is overwritten. Check if Prometheus reserved labels are used.\n", extras[i])
}
labelMap[extras[i]] = prompb.Label{
Name: sanitize(extras[i]),
Value: extras[i+1],
}
}

// Create slice of labels from labelMap and return
res := make([]*prompb.Label, 0, len(labelMap))
for _, lb := range labelMap {
currentLabel := lb
res = append(res, &currentLabel)
}

return res
}

// AddHeaders adds required headers as well as all headers in Header map to a http request.
func (e *Exporter) addHeaders(req *http.Request) {
// Cortex expects Snappy-compressed protobuf messages. These three headers are
// hard-coded as they should be on every request.
// Cortex expects Snappy-compressed protobuf messages. These two headers are hard-coded as they
// should be on every request.
req.Header.Add("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
Expand Down
Loading