Skip to content

Commit e5fc693

Browse files
rapphilAneurysm9bryan-aguilar
authored
[exporter/prometheusremotewrite] Fix: Don't drop batch in case of failure to translate metrics (#29729)
Don't drop a whole batch in case of failure to translate from Otel to Prometheus. Instead, with this PR we are trying to send to Prometheus all the metrics that were properly translated and create a warning message for failures to translate. This PR also adds supports to telemetry in this component so that it is possible to inspect how the translation process is happening and identify failed translations. I opted to not include the number of time series that failed translation because I don't want to make assumptions about how the `FromMetrics` function works. Instead we are just publishing if there was any failure during the translation process and the number of time series returned. **Link to tracking Issue:** #15281 **Testing:** UTs were added to account for the case that you have mixed metrics, with some succeeding the translation and some failing. --------- Signed-off-by: Raphael Silva <[email protected]> Co-authored-by: Anthony Mirabella <[email protected]> Co-authored-by: bryan-aguilar <[email protected]> Co-authored-by: Bryan Aguilar <[email protected]>
1 parent ae8fde2 commit e5fc693

File tree

4 files changed

+156
-35
lines changed

4 files changed

+156
-35
lines changed

.chloggen/prw_failure_translate.yaml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: prometheusremotewriteexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Publish telemetry about translation of metrics from Otel to Prometheus. Don't drop all data points if some fail translation.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [29729]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/prometheusremotewriteexporter/exporter.go

+60-2
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,35 @@ import (
2525
"go.opentelemetry.io/collector/consumer/consumererror"
2626
"go.opentelemetry.io/collector/exporter"
2727
"go.opentelemetry.io/collector/pdata/pmetric"
28+
"go.opentelemetry.io/otel/attribute"
29+
"go.opentelemetry.io/otel/metric"
2830
"go.uber.org/multierr"
31+
"go.uber.org/zap"
2932

33+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter/internal/metadata"
3034
prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
3135
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"
3236
)
3337

38+
type prwTelemetry interface {
39+
recordTranslationFailure(ctx context.Context)
40+
recordTranslatedTimeSeries(ctx context.Context, numTS int)
41+
}
42+
43+
type prwTelemetryOtel struct {
44+
failedTranslations metric.Int64Counter
45+
translatedTimeSeries metric.Int64Counter
46+
otelAttrs []attribute.KeyValue
47+
}
48+
49+
func (p *prwTelemetryOtel) recordTranslationFailure(ctx context.Context) {
50+
p.failedTranslations.Add(ctx, 1, metric.WithAttributes(p.otelAttrs...))
51+
}
52+
53+
func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS int) {
54+
p.translatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...))
55+
}
56+
3457
// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
3558
type prwExporter struct {
3659
endpointURL *url.URL
@@ -45,6 +68,31 @@ type prwExporter struct {
4568
retrySettings configretry.BackOffConfig
4669
wal *prweWAL
4770
exporterSettings prometheusremotewrite.Settings
71+
telemetry prwTelemetry
72+
}
73+
74+
func newPRWTelemetry(set exporter.CreateSettings) (prwTelemetry, error) {
75+
76+
meter := metadata.Meter(set.TelemetrySettings)
77+
// TODO: create helper functions similar to the processor helper: BuildCustomMetricName
78+
prefix := "exporter/" + metadata.Type.String() + "/"
79+
failedTranslations, errFailedTranslation := meter.Int64Counter(prefix+"failed_translations",
80+
metric.WithDescription("Number of translation operations that failed to translate metrics from Otel to Prometheus"),
81+
metric.WithUnit("1"),
82+
)
83+
84+
translatedTimeSeries, errTranslatedMetrics := meter.Int64Counter(prefix+"translated_time_series",
85+
metric.WithDescription("Number of Prometheus time series that were translated from OTel metrics"),
86+
metric.WithUnit("1"),
87+
)
88+
89+
return &prwTelemetryOtel{
90+
failedTranslations: failedTranslations,
91+
translatedTimeSeries: translatedTimeSeries,
92+
otelAttrs: []attribute.KeyValue{
93+
attribute.String("exporter", set.ID.String()),
94+
},
95+
}, errors.Join(errFailedTranslation, errTranslatedMetrics)
4896
}
4997

5098
// newPRWExporter initializes a new prwExporter instance and sets fields accordingly.
@@ -59,6 +107,11 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
59107
return nil, errors.New("invalid endpoint")
60108
}
61109

110+
prwTelemetry, err := newPRWTelemetry(set)
111+
if err != nil {
112+
return nil, err
113+
}
114+
62115
userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version)
63116

64117
prwe := &prwExporter{
@@ -79,6 +132,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
79132
AddMetricSuffixes: cfg.AddMetricSuffixes,
80133
SendMetadata: cfg.SendMetadata,
81134
},
135+
telemetry: prwTelemetry,
82136
}
83137

84138
prwe.wal = newWAL(cfg.WAL, prwe.export)
@@ -128,15 +182,19 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
128182

129183
tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
130184
if err != nil {
131-
err = consumererror.NewPermanent(err)
185+
prwe.telemetry.recordTranslationFailure(ctx)
186+
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
132187
}
133188

189+
prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))
190+
134191
var m []*prompb.MetricMetadata
135192
if prwe.exporterSettings.SendMetadata {
136193
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
137194
}
195+
138196
// Call export even if a conversion error, since there may be points that were successfully converted.
139-
return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m))
197+
return prwe.handleExport(ctx, tsMap, m)
140198
}
141199
}
142200

exporter/prometheusremotewriteexporter/exporter_test.go

+68-32
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,19 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
372372
return prwe.handleExport(context.Background(), testmap, nil)
373373
}
374374

375+
type mockPRWTelemetry struct {
376+
failedTranslations int
377+
translatedTimeSeries int
378+
}
379+
380+
func (m *mockPRWTelemetry) recordTranslationFailure(_ context.Context) {
381+
m.failedTranslations++
382+
}
383+
384+
func (m *mockPRWTelemetry) recordTranslatedTimeSeries(_ context.Context, numTs int) {
385+
m.translatedTimeSeries += numTs
386+
}
387+
375388
// Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
376389
// expected
377390
func Test_PushMetrics(t *testing.T) {
@@ -420,6 +433,11 @@ func Test_PushMetrics(t *testing.T) {
420433

421434
emptySummaryBatch := getMetricsFromMetricList(invalidMetrics[emptySummary])
422435

436+
// partial success (or partial failure) cases
437+
438+
partialSuccess1 := getMetricsFromMetricList(validMetrics1[validSum], validMetrics2[validSum],
439+
validMetrics1[validIntGauge], validMetrics2[validIntGauge], invalidMetrics[emptyGauge])
440+
423441
// staleNaN cases
424442
staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram])
425443
staleNaNEmptyHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNEmptyHistogram])
@@ -457,20 +475,23 @@ func Test_PushMetrics(t *testing.T) {
457475
}
458476

459477
tests := []struct {
460-
name string
461-
metrics pmetric.Metrics
462-
reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
463-
expectedTimeSeries int
464-
httpResponseCode int
465-
returnErr bool
466-
isStaleMarker bool
467-
skipForWAL bool
478+
name string
479+
metrics pmetric.Metrics
480+
reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
481+
expectedTimeSeries int
482+
httpResponseCode int
483+
returnErr bool
484+
isStaleMarker bool
485+
skipForWAL bool
486+
expectedFailedTranslations int
468487
}{
469488
{
470-
name: "invalid_type_case",
471-
metrics: invalidTypeBatch,
472-
httpResponseCode: http.StatusAccepted,
473-
returnErr: true,
489+
name: "invalid_type_case",
490+
metrics: invalidTypeBatch,
491+
httpResponseCode: http.StatusAccepted,
492+
reqTestFunc: checkFunc,
493+
expectedTimeSeries: 1, // the resource target metric.
494+
expectedFailedTranslations: 1,
474495
},
475496
{
476497
name: "intSum_case",
@@ -567,32 +588,40 @@ func Test_PushMetrics(t *testing.T) {
567588
skipForWAL: true,
568589
},
569590
{
570-
name: "emptyGauge_case",
571-
metrics: emptyDoubleGaugeBatch,
572-
reqTestFunc: checkFunc,
573-
httpResponseCode: http.StatusAccepted,
574-
returnErr: true,
591+
name: "emptyGauge_case",
592+
metrics: emptyDoubleGaugeBatch,
593+
reqTestFunc: checkFunc,
594+
httpResponseCode: http.StatusAccepted,
595+
expectedFailedTranslations: 1,
596+
},
597+
{
598+
name: "emptyCumulativeSum_case",
599+
metrics: emptyCumulativeSumBatch,
600+
reqTestFunc: checkFunc,
601+
httpResponseCode: http.StatusAccepted,
602+
expectedFailedTranslations: 1,
575603
},
576604
{
577-
name: "emptyCumulativeSum_case",
578-
metrics: emptyCumulativeSumBatch,
579-
reqTestFunc: checkFunc,
580-
httpResponseCode: http.StatusAccepted,
581-
returnErr: true,
605+
name: "emptyCumulativeHistogram_case",
606+
metrics: emptyCumulativeHistogramBatch,
607+
reqTestFunc: checkFunc,
608+
httpResponseCode: http.StatusAccepted,
609+
expectedFailedTranslations: 1,
582610
},
583611
{
584-
name: "emptyCumulativeHistogram_case",
585-
metrics: emptyCumulativeHistogramBatch,
586-
reqTestFunc: checkFunc,
587-
httpResponseCode: http.StatusAccepted,
588-
returnErr: true,
612+
name: "emptySummary_case",
613+
metrics: emptySummaryBatch,
614+
reqTestFunc: checkFunc,
615+
httpResponseCode: http.StatusAccepted,
616+
expectedFailedTranslations: 1,
589617
},
590618
{
591-
name: "emptySummary_case",
592-
metrics: emptySummaryBatch,
593-
reqTestFunc: checkFunc,
594-
httpResponseCode: http.StatusAccepted,
595-
returnErr: true,
619+
name: "partialSuccess_case",
620+
metrics: partialSuccess1,
621+
reqTestFunc: checkFunc,
622+
httpResponseCode: http.StatusAccepted,
623+
expectedTimeSeries: 4,
624+
expectedFailedTranslations: 1,
596625
},
597626
{
598627
name: "staleNaNIntGauge_case",
@@ -668,6 +697,7 @@ func Test_PushMetrics(t *testing.T) {
668697
}
669698
t.Run(tt.name, func(t *testing.T) {
670699
t.Parallel()
700+
mockTelemetry := &mockPRWTelemetry{}
671701
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
672702
if tt.reqTestFunc != nil {
673703
tt.reqTestFunc(t, r, tt.expectedTimeSeries, tt.isStaleMarker)
@@ -716,7 +746,10 @@ func Test_PushMetrics(t *testing.T) {
716746
}
717747
set := exportertest.NewNopCreateSettings()
718748
set.BuildInfo = buildInfo
749+
719750
prwe, nErr := newPRWExporter(cfg, set)
751+
prwe.telemetry = mockTelemetry
752+
720753
require.NoError(t, nErr)
721754
ctx, cancel := context.WithCancel(context.Background())
722755
defer cancel()
@@ -729,6 +762,9 @@ func Test_PushMetrics(t *testing.T) {
729762
assert.Error(t, err)
730763
return
731764
}
765+
766+
assert.Equal(t, tt.expectedFailedTranslations, mockTelemetry.failedTranslations)
767+
assert.Equal(t, tt.expectedTimeSeries, mockTelemetry.translatedTimeSeries)
732768
assert.NoError(t, err)
733769
})
734770
}

exporter/prometheusremotewriteexporter/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ require (
2323
go.opentelemetry.io/collector/consumer v0.96.1-0.20240306115632-b2693620eff6
2424
go.opentelemetry.io/collector/exporter v0.96.1-0.20240306115632-b2693620eff6
2525
go.opentelemetry.io/collector/pdata v1.3.1-0.20240306115632-b2693620eff6
26+
go.opentelemetry.io/otel v1.24.0
2627
go.opentelemetry.io/otel/metric v1.24.0
2728
go.opentelemetry.io/otel/trace v1.24.0
2829
go.uber.org/goleak v1.3.0
@@ -70,7 +71,6 @@ require (
7071
go.opentelemetry.io/collector/receiver v0.96.1-0.20240306115632-b2693620eff6 // indirect
7172
go.opentelemetry.io/collector/semconv v0.96.1-0.20240306115632-b2693620eff6 // indirect
7273
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
73-
go.opentelemetry.io/otel v1.24.0 // indirect
7474
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
7575
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
7676
go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect

0 commit comments

Comments
 (0)