From 2be1d9b64df98961a17115adedf8757d524d118b Mon Sep 17 00:00:00 2001 From: Juraj Michalek Date: Thu, 3 Oct 2024 16:40:46 +0200 Subject: [PATCH 1/4] feat: prometheus translation add support for rw2 --- .../prometheusremotewrite/helper_v2.go | 144 +++++++++++++++++ .../metrics_to_prw_v2.go | 151 ++++++++++++++++++ .../metrics_to_prw_v2_test.go | 26 +++ .../number_data_points_v2.go | 44 +++++ 4 files changed, 365 insertions(+) create mode 100644 pkg/translator/prometheusremotewrite/helper_v2.go create mode 100644 pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go create mode 100644 pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go create mode 100644 pkg/translator/prometheusremotewrite/number_data_points_v2.go diff --git a/pkg/translator/prometheusremotewrite/helper_v2.go b/pkg/translator/prometheusremotewrite/helper_v2.go new file mode 100644 index 000000000000..4bb73dd7853d --- /dev/null +++ b/pkg/translator/prometheusremotewrite/helper_v2.go @@ -0,0 +1,144 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" + +import ( + "fmt" + prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "go.opentelemetry.io/collector/pdata/pcommon" + conventions "go.opentelemetry.io/collector/semconv/v1.25.0" + "log" + "slices" +) + +// TODO implement this fully +// getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false. +// Otherwise it creates a new one and returns that, and true. +func (c *prometheusConverterV2) getOrCreateTimeSeries(lbls labels.Labels) (*writev2.TimeSeries, bool) { + h := lbls.Hash() + ts := c.unique[h] + + if ts != nil { + if c.isSameMetricV2(ts, lbls) { + // We already have this metric + return ts, false + } + + // Look for a matching conflict + for _, cTS := range c.conflicts[h] { + if c.isSameMetricV2(cTS, lbls) { + // We already have this metric + return cTS, false + } + } + + // New conflict + ts = &writev2.TimeSeries{} + ts.LabelsRefs = c.symbolTable.SymbolizeLabels(lbls, ts.LabelsRefs) + c.conflicts[h] = append(c.conflicts[h], ts) + return ts, true + + } + + // This metric is new + ts = &writev2.TimeSeries{} + ts.LabelsRefs = c.symbolTable.SymbolizeLabels(lbls, ts.LabelsRefs) + c.unique[0] = ts + return ts, true +} + +// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. +// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and +// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. +func createAttributesV2(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, + ignoreAttrs []string, logOnOverwrite bool, extras ...string) labels.Labels { + resourceAttrs := resource.Attributes() + serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) + instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID) + + // Calculate the maximum possible number of labels we could return so we can preallocate l + maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 + + if haveServiceName { + maxLabelCount++ + } + + if haveInstanceID { + maxLabelCount++ + } + + // map ensures no duplicate label name + l := make(map[string]string, maxLabelCount) + + // Ensure attributes are sorted by key for consistent merging of keys which + // collide when sanitized. + serieslabels := labels.Labels{} + // XXX: Should we always drop service namespace/service name/service instance ID from the labels + // (as they get mapped to other Prometheus labels)? + attributes.Range(func(key string, value pcommon.Value) bool { + if !slices.Contains(ignoreAttrs, key) { + serieslabels = append(serieslabels, labels.Label{Name: key, Value: value.AsString()}) + } + return true + }) + // Afaik not needed + //sort.Stable(ByLabelName(labels)) + + for _, label := range serieslabels { + var finalKey = prometheustranslator.NormalizeLabel(label.Name) + if existingValue, alreadyExists := l[finalKey]; alreadyExists { + l[finalKey] = existingValue + ";" + label.Value + } else { + l[finalKey] = label.Value + } + } + + // Map service.name + service.namespace to job + if haveServiceName { + val := serviceName.AsString() + if serviceNamespace, ok := resourceAttrs.Get(conventions.AttributeServiceNamespace); ok { + val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val) + } + l[model.JobLabel] = val + } + // Map service.instance.id to instance + if haveInstanceID { + l[model.InstanceLabel] = instance.AsString() + } + for key, value := range externalLabels { + // External labels have already been sanitized + if _, alreadyExists := l[key]; alreadyExists { + // Skip external labels if they are overridden by metric attributes + continue + } + l[key] = value + } + + for i := 0; i < len(extras); i += 2 { + if i+1 >= len(extras) { + break + } + _, found := l[extras[i]] + if found && logOnOverwrite { + log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.") + } + // internal labels should be maintained + name := extras[i] + if !(len(name) > 4 && name[:2] == "__" && name[len(name)-2:] == "__") { + name = prometheustranslator.NormalizeLabel(name) + } + l[name] = extras[i+1] + } + + // TODO what was this for?? + //labels = labels[:0] + //for k, v := range l { + // labels = append(labels, prompb.Label{Name: k, Value: v}) + //} + + return serieslabels +} diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go b/pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go new file mode 100644 index 000000000000..40ffaf55541f --- /dev/null +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go @@ -0,0 +1,151 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" + +import ( + "errors" + "fmt" + "github.com/prometheus/prometheus/model/labels" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "strconv" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" + + prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" +) + +// FromMetricsV2 converts pmetric.Metrics to Prometheus remote write format 2.0. +func FromMetricsV2(md pmetric.Metrics, settings Settings) (map[string]*writev2.TimeSeries, error) { + c := newPrometheusConverterV2() + errs := c.fromMetrics(md, settings) + tss := c.timeSeries() + out := make(map[string]*writev2.TimeSeries, len(tss)) + for i := range tss { + out[strconv.Itoa(i)] = &tss[i] + } + + return out, errs +} + +// prometheusConverter converts from OTel write format to Prometheus write format. +type prometheusConverterV2 struct { + unique map[uint64]*writev2.TimeSeries + conflicts map[uint64][]*writev2.TimeSeries + symbolTable writev2.SymbolsTable +} + +func newPrometheusConverterV2() *prometheusConverterV2 { + return &prometheusConverterV2{ + unique: map[uint64]*writev2.TimeSeries{}, + conflicts: map[uint64][]*writev2.TimeSeries{}, + symbolTable: writev2.NewSymbolTable(), + } +} + +// fromMetrics converts pmetric.Metrics to Prometheus remote write format. +func (c *prometheusConverterV2) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) { + resourceMetricsSlice := md.ResourceMetrics() + for i := 0; i < resourceMetricsSlice.Len(); i++ { + resourceMetrics := resourceMetricsSlice.At(i) + resource := resourceMetrics.Resource() + scopeMetricsSlice := resourceMetrics.ScopeMetrics() + // keep track of the most recent timestamp in the ResourceMetrics for + // use with the "target" info metric + var mostRecentTimestamp pcommon.Timestamp + for j := 0; j < scopeMetricsSlice.Len(); j++ { + metricSlice := scopeMetricsSlice.At(j).Metrics() + + // TODO: decide if instrumentation library information should be exported as labels + for k := 0; k < metricSlice.Len(); k++ { + metric := metricSlice.At(k) + mostRecentTimestamp = maxTimestamp(mostRecentTimestamp, mostRecentTimestampInMetric(metric)) + + if !isValidAggregationTemporality(metric) { + errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name())) + continue + } + + promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) + + // handle individual metrics based on type + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + c.addGaugeNumberDataPoints(dataPoints, resource, settings, promName) + case pmetric.MetricTypeSum: + // TODO implement + case pmetric.MetricTypeHistogram: + // TODO implement + case pmetric.MetricTypeExponentialHistogram: + // TODO implement + case pmetric.MetricTypeSummary: + // TODO implement + default: + errs = multierr.Append(errs, errors.New("unsupported metric type")) + } + } + } + // TODO implement + //addResourceTargetInfov2(resource, settings, mostRecentTimestamp, c) + } + + return +} + +// timeSeries returns a slice of the writev2.TimeSeries that were converted from OTel format. +func (c *prometheusConverterV2) timeSeries() []writev2.TimeSeries { + conflicts := 0 + for _, ts := range c.conflicts { + conflicts += len(ts) + } + allTS := make([]writev2.TimeSeries, 0, len(c.unique)+conflicts) + for _, ts := range c.unique { + allTS = append(allTS, *ts) + } + for _, cTS := range c.conflicts { + for _, ts := range cTS { + allTS = append(allTS, *ts) + } + } + + return allTS +} + +func (c *prometheusConverterV2) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { + // TODO implement + return +} + +func (c *prometheusConverterV2) addSample(sample *writev2.Sample, lbls labels.Labels) *writev2.TimeSeries { + if sample == nil || len(lbls) == 0 { + // This shouldn't happen + return nil + } + + ts, _ := c.getOrCreateTimeSeries(lbls) + ts.Samples = append(ts.Samples, *sample) + return ts +} + +func (c *prometheusConverterV2) isSameMetricV2(ts *writev2.TimeSeries, lbls labels.Labels) bool { + b := labels.NewScratchBuilder(0) + seriesLabels := ts.ToLabels(&b, c.symbolTable.Symbols()) + + if len(seriesLabels) != len(lbls) { + return false + } + for i, l := range seriesLabels { + if l.Name != seriesLabels[i].Name || l.Value != seriesLabels[i].Value { + return false + } + } + return true +} diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go b/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go new file mode 100644 index 000000000000..918705d5e237 --- /dev/null +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestFromMetricsV2(t *testing.T) { + settings := Settings{ + Namespace: "", + ExternalLabels: nil, + DisableTargetInfo: false, + ExportCreatedMetric: false, + AddMetricSuffixes: false, + SendMetadata: false, + } + + payload := createExportRequest(5, 0, 1, 3, 0) + + tsMap, err := FromMetricsV2(payload.Metrics(), settings) + require.NoError(t, err) + println(tsMap) +} diff --git a/pkg/translator/prometheusremotewrite/number_data_points_v2.go b/pkg/translator/prometheusremotewrite/number_data_points_v2.go new file mode 100644 index 000000000000..c8b64d14da0a --- /dev/null +++ b/pkg/translator/prometheusremotewrite/number_data_points_v2.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" + +import ( + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "math" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/value" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func (c *prometheusConverterV2) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, + resource pcommon.Resource, settings Settings, name string) { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + labels := createAttributesV2( + resource, + pt.Attributes(), + settings.ExternalLabels, + nil, + true, + model.MetricNameLabel, + name, + ) + sample := &writev2.Sample{ + // convert ns to ms + Timestamp: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + c.addSample(sample, labels) + } +} From 56a4915cc12ceb1f9781bc745fef9f3e36b934f0 Mon Sep 17 00:00:00 2001 From: Juraj Michalek Date: Thu, 3 Oct 2024 16:48:28 +0200 Subject: [PATCH 2/4] chore: updated initial test case --- pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go b/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go index 918705d5e237..9bc12017f304 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go @@ -22,5 +22,5 @@ func TestFromMetricsV2(t *testing.T) { tsMap, err := FromMetricsV2(payload.Metrics(), settings) require.NoError(t, err) - println(tsMap) + require.NotNil(t, tsMap) } From 6623b39a126547946c41896cb424e642c7ca1c14 Mon Sep 17 00:00:00 2001 From: Juraj Michalek Date: Sat, 5 Oct 2024 14:19:22 +0200 Subject: [PATCH 3/4] chore: added unit test for getOrCreateTimeSeries V2 Signed-off-by: Juraj Michalek --- .../prometheusremotewrite/helper_v2.go | 3 +- .../prometheusremotewrite/helper_v2_test.go | 83 +++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) create mode 100644 pkg/translator/prometheusremotewrite/helper_v2_test.go diff --git a/pkg/translator/prometheusremotewrite/helper_v2.go b/pkg/translator/prometheusremotewrite/helper_v2.go index 4bb73dd7853d..dc125be44472 100644 --- a/pkg/translator/prometheusremotewrite/helper_v2.go +++ b/pkg/translator/prometheusremotewrite/helper_v2.go @@ -15,7 +15,6 @@ import ( "slices" ) -// TODO implement this fully // getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false. // Otherwise it creates a new one and returns that, and true. func (c *prometheusConverterV2) getOrCreateTimeSeries(lbls labels.Labels) (*writev2.TimeSeries, bool) { @@ -47,7 +46,7 @@ func (c *prometheusConverterV2) getOrCreateTimeSeries(lbls labels.Labels) (*writ // This metric is new ts = &writev2.TimeSeries{} ts.LabelsRefs = c.symbolTable.SymbolizeLabels(lbls, ts.LabelsRefs) - c.unique[0] = ts + c.unique[h] = ts return ts, true } diff --git a/pkg/translator/prometheusremotewrite/helper_v2_test.go b/pkg/translator/prometheusremotewrite/helper_v2_test.go new file mode 100644 index 000000000000..035251dc4de1 --- /dev/null +++ b/pkg/translator/prometheusremotewrite/helper_v2_test.go @@ -0,0 +1,83 @@ +package prometheusremotewrite + +import ( + "github.com/prometheus/prometheus/model/labels" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/stretchr/testify/require" + "testing" +) + +func TestPrometheusConverter_getOrCreateTimeSeriesV2(t *testing.T) { + converter := newPrometheusConverterV2() + lbls := labels.Labels{ + labels.Label{ + Name: "key1", + Value: "value1", + }, + labels.Label{ + Name: "key2", + Value: "value2", + }, + } + ts, created := converter.getOrCreateTimeSeries(lbls) + require.NotNil(t, ts) + require.True(t, created) + + var b labels.ScratchBuilder + createdLabels := ts.ToLabels(&b, converter.symbolTable.Symbols()) + + // Now, get (not create) the unique time series + gotTS, created := converter.getOrCreateTimeSeries(createdLabels) + require.Same(t, ts, gotTS) + require.False(t, created) + + var keys []uint64 + for k := range converter.unique { + keys = append(keys, k) + } + require.Len(t, keys, 1) + h := keys[0] + + // Make sure that state is correctly set + require.Equal(t, map[uint64]*writev2.TimeSeries{ + h: ts, + }, converter.unique) + require.Empty(t, converter.conflicts) + + // Fake a hash collision, by making this not equal to the next series with the same hash + createdLabels = append(createdLabels, labels.Label{Name: "key3", Value: "value3"}) + ts.LabelsRefs = converter.symbolTable.SymbolizeLabels(createdLabels, ts.LabelsRefs) + + // Make the first hash collision + cTS1, created := converter.getOrCreateTimeSeries(lbls) + require.NotNil(t, cTS1) + require.True(t, created) + require.Equal(t, map[uint64][]*writev2.TimeSeries{ + h: {cTS1}, + }, converter.conflicts) + + // Fake a hash collision, by making this not equal to the next series with the same hash + createdLabels1 := cTS1.ToLabels(&b, converter.symbolTable.Symbols()) + createdLabels1 = append(createdLabels1, labels.Label{Name: "key3", Value: "value3"}) + cTS1.LabelsRefs = converter.symbolTable.SymbolizeLabels(createdLabels1, ts.LabelsRefs) + + // Make the second hash collision + cTS2, created := converter.getOrCreateTimeSeries(lbls) + require.NotNil(t, cTS2) + require.True(t, created) + require.Equal(t, map[uint64][]*writev2.TimeSeries{ + h: {cTS1, cTS2}, + }, converter.conflicts) + + // Now, get (not create) the second colliding time series + gotCTS2, created := converter.getOrCreateTimeSeries(lbls) + require.Same(t, cTS2, gotCTS2) + require.False(t, created) + require.Equal(t, map[uint64][]*writev2.TimeSeries{ + h: {cTS1, cTS2}, + }, converter.conflicts) + + require.Equal(t, map[uint64]*writev2.TimeSeries{ + h: ts, + }, converter.unique) +} From 22c9cd38963a9308f53b6ca248662320ec67af39 Mon Sep 17 00:00:00 2001 From: Juraj Michalek Date: Mon, 7 Oct 2024 19:45:11 +0200 Subject: [PATCH 4/4] chore: added unit test for createAttributesV2 and fixed it Signed-off-by: Juraj Michalek --- .../prometheusremotewrite/helper_v2.go | 9 +- .../prometheusremotewrite/helper_v2_test.go | 129 ++++++++++++++++++ .../prometheusremotewrite/testutils_test.go | 13 ++ 3 files changed, 146 insertions(+), 5 deletions(-) diff --git a/pkg/translator/prometheusremotewrite/helper_v2.go b/pkg/translator/prometheusremotewrite/helper_v2.go index dc125be44472..4a2c99ae8774 100644 --- a/pkg/translator/prometheusremotewrite/helper_v2.go +++ b/pkg/translator/prometheusremotewrite/helper_v2.go @@ -133,11 +133,10 @@ func createAttributesV2(resource pcommon.Resource, attributes pcommon.Map, exter l[name] = extras[i+1] } - // TODO what was this for?? - //labels = labels[:0] - //for k, v := range l { - // labels = append(labels, prompb.Label{Name: k, Value: v}) - //} + serieslabels = serieslabels[:0] + for k, v := range l { + serieslabels = append(serieslabels, labels.Label{Name: k, Value: v}) + } return serieslabels } diff --git a/pkg/translator/prometheusremotewrite/helper_v2_test.go b/pkg/translator/prometheusremotewrite/helper_v2_test.go index 035251dc4de1..d7a554851573 100644 --- a/pkg/translator/prometheusremotewrite/helper_v2_test.go +++ b/pkg/translator/prometheusremotewrite/helper_v2_test.go @@ -3,7 +3,9 @@ package prometheusremotewrite import ( "github.com/prometheus/prometheus/model/labels" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "testing" ) @@ -81,3 +83,130 @@ func TestPrometheusConverter_getOrCreateTimeSeriesV2(t *testing.T) { h: ts, }, converter.unique) } + +// Test_createLabelSet checks resultant label names are sanitized and label in extra overrides label in labels if +// collision happens. It does not check whether labels are not sorted +func Test_createLabelSetV2(t *testing.T) { + tests := []struct { + name string + resource pcommon.Resource + orig pcommon.Map + externalLabels map[string]string + extras []string + want labels.Labels + }{ + { + "labels_clean", + pcommon.NewResource(), + lbs1, + map[string]string{}, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32), + }, + { + "labels_with_resource", + func() pcommon.Resource { + res := pcommon.NewResource() + res.Attributes().PutStr("service.name", "prometheus") + res.Attributes().PutStr("service.instance.id", "127.0.0.1:8080") + return res + }(), + lbs1, + map[string]string{}, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32, "job", "prometheus", "instance", "127.0.0.1:8080"), + }, + { + "labels_with_nonstring_resource", + func() pcommon.Resource { + res := pcommon.NewResource() + res.Attributes().PutInt("service.name", 12345) + res.Attributes().PutBool("service.instance.id", true) + return res + }(), + lbs1, + map[string]string{}, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32, "job", "12345", "instance", "true"), + }, + { + "labels_duplicate_in_extras", + pcommon.NewResource(), + lbs1, + map[string]string{}, + []string{label11, value31}, + getPromLabelsV2(label11, value31, label12, value12), + }, + { + "labels_dirty", + pcommon.NewResource(), + lbs1Dirty, + map[string]string{}, + []string{label31 + dirty1, value31, label32, value32}, + getPromLabelsV2(label11+"_", value11, "key_"+label12, value12, label31+"_", value31, label32, value32), + }, + { + "no_original_case", + pcommon.NewResource(), + pcommon.NewMap(), + nil, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label31, value31, label32, value32), + }, + { + "empty_extra_case", + pcommon.NewResource(), + lbs1, + map[string]string{}, + []string{"", ""}, + getPromLabelsV2(label11, value11, label12, value12, "", ""), + }, + { + "single_left_over_case", + pcommon.NewResource(), + lbs1, + map[string]string{}, + []string{label31, value31, label32}, + getPromLabelsV2(label11, value11, label12, value12, label31, value31), + }, + { + "valid_external_labels", + pcommon.NewResource(), + lbs1, + exlbs1, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, label41, value41, label31, value31, label32, value32), + }, + { + "overwritten_external_labels", + pcommon.NewResource(), + lbs1, + exlbs2, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32), + }, + { + "colliding attributes", + pcommon.NewResource(), + lbsColliding, + nil, + []string{label31, value31, label32, value32}, + getPromLabelsV2(collidingSanitized, value11+";"+value12, label31, value31, label32, value32), + }, + { + "sanitize_labels_starts_with_underscore", + pcommon.NewResource(), + lbs3, + exlbs1, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, "key"+label51, value51, label41, value41, label31, value31, label32, value32), + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res := createAttributesV2(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...) + assert.ElementsMatch(t, tt.want, res) + }) + } +} diff --git a/pkg/translator/prometheusremotewrite/testutils_test.go b/pkg/translator/prometheusremotewrite/testutils_test.go index 49ef7a735081..82a7754012e8 100644 --- a/pkg/translator/prometheusremotewrite/testutils_test.go +++ b/pkg/translator/prometheusremotewrite/testutils_test.go @@ -5,6 +5,7 @@ package prometheusremotewrite import ( "encoding/hex" + "github.com/prometheus/prometheus/model/labels" "math" "strings" "testing" @@ -147,6 +148,18 @@ func getPromLabels(lbs ...string) []prompb.Label { return pbLbs.Labels } +// Prometheus TimeSeries +func getPromLabelsV2(lbs ...string) labels.Labels { + pbLbs := labels.Labels{} + for i := 0; i < len(lbs); i += 2 { + pbLbs = append(pbLbs, labels.Label{ + Name: lbs[i], + Value: lbs[i+1], + }) + } + return pbLbs +} + func getLabel(name string, value string) prompb.Label { return prompb.Label{ Name: name,