Skip to content

Commit b4b1968

Browse files
committed
[exporterhelper] Add data_type attribute to internal queue metric
Add data_type attribute to the internal otelcol_exporter_queue_size metric to report the type of data being processed. All other metrics have the data type reported as part of their names. We could've done the same for queue metrics, but that would introduce a significant breaking change. We want to avoid that until we have all the metrics standardized with OpenTelemetry semantic conventions.
1 parent 21c3f36 commit b4b1968

File tree

10 files changed

+98
-45
lines changed

10 files changed

+98
-45
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: breaking
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: component/componenttest
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add optional ...attribute.KeyValue argument to TestTelemetry.CheckExporterMetricGauge.
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [10593]
12+
13+
# Optional: The change log or logs in which this entry should be included.
14+
# e.g. '[user]' or '[user, api]'
15+
# Include 'user' if the change is relevant to end users.
16+
# Include 'api' if there is a change to a library API.
17+
# Default: '[user]'
18+
change_logs: [api]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add data_type attribute to `otelcol_exporter_queue_size` metric to report the type of data being processed.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [9943]
14+
15+
# Optional: The change log or logs in which this entry should be included.
16+
# e.g. '[user]' or '[user, api]'
17+
# Include 'user' if the change is relevant to end users.
18+
# Include 'api' if there is a change to a library API.
19+
# Default: '[user]'
20+
change_logs: [user]

component/componenttest/obsreporttest.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/prometheus/client_golang/prometheus"
1010
"github.com/prometheus/client_golang/prometheus/promhttp"
11+
"go.opentelemetry.io/otel/attribute"
1112
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
1213
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
1314
"go.opentelemetry.io/otel/sdk/resource"
@@ -72,8 +73,10 @@ func (tts *TestTelemetry) CheckExporterLogs(sentLogRecords, sendFailedLogRecords
7273
return tts.prometheusChecker.checkExporterLogs(tts.id, sentLogRecords, sendFailedLogRecords)
7374
}
7475

75-
func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) error {
76-
return tts.prometheusChecker.checkExporterMetricGauge(tts.id, metric, val)
76+
func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, extraAttrs ...attribute.KeyValue) error {
77+
attrs := attributesForExporterMetrics(tts.id)
78+
attrs = append(attrs, extraAttrs...)
79+
return tts.prometheusChecker.checkGauge(metric, val, attrs)
7780
}
7881

7982
// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.

component/componenttest/otelprometheuschecker.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,8 @@ func (pc *prometheusChecker) checkExporterEnqueueFailed(exporter component.ID, d
100100
return pc.checkCounter(fmt.Sprintf("exporter_enqueue_failed_%s", datatype), enqueueFailed, exporterAttrs)
101101
}
102102

103-
func (pc *prometheusChecker) checkExporterMetricGauge(exporter component.ID, metric string, val int64) error {
104-
exporterAttrs := attributesForExporterMetrics(exporter)
105-
106-
ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, exporterAttrs)
103+
func (pc *prometheusChecker) checkGauge(metric string, val int64, attrs []attribute.KeyValue) error {
104+
ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, attrs)
107105
if err != nil {
108106
return err
109107
}

exporter/exporterhelper/common.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func WithQueue(config QueueSettings) Option {
110110
NumConsumers: config.NumConsumers,
111111
QueueSize: config.QueueSize,
112112
})
113-
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder)
113+
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep)
114114
return nil
115115
}
116116
}
@@ -132,7 +132,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
132132
DataType: o.signal,
133133
ExporterSettings: o.set,
134134
}
135-
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder)
135+
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep)
136136
return nil
137137
}
138138
}
@@ -250,7 +250,7 @@ type baseExporter struct {
250250
}
251251

252252
func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
253-
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
253+
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set, DataType: signal})
254254
if err != nil {
255255
return nil, err
256256
}

exporter/exporterhelper/obsexporter.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type ObsReport struct {
2626
level configtelemetry.Level
2727
spanNamePrefix string
2828
tracer trace.Tracer
29+
dataType component.DataType
2930

3031
otelAttrs []attribute.KeyValue
3132
telemetryBuilder *metadata.TelemetryBuilder
@@ -38,6 +39,7 @@ type ObsReport struct {
3839
type ObsReportSettings struct {
3940
ExporterID component.ID
4041
ExporterCreateSettings exporter.Settings
42+
DataType component.DataType
4143
}
4244

4345
// NewObsReport creates a new Exporter.
@@ -58,7 +60,7 @@ func newExporter(cfg ObsReportSettings) (*ObsReport, error) {
5860
level: cfg.ExporterCreateSettings.TelemetrySettings.MetricsLevel,
5961
spanNamePrefix: obsmetrics.ExporterPrefix + cfg.ExporterID.String(),
6062
tracer: cfg.ExporterCreateSettings.TracerProvider.Tracer(cfg.ExporterID.String()),
61-
63+
dataType: cfg.DataType,
6264
otelAttrs: []attribute.KeyValue{
6365
attribute.String(obsmetrics.ExporterKey, cfg.ExporterID.String()),
6466
},

exporter/exporterhelper/obsreport_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
)
1616

1717
func TestExportEnqueueFailure(t *testing.T) {
18-
exporterID := component.MustNewID("fakeExporter")
1918
tt, err := componenttest.SetupTelemetry(exporterID)
2019
require.NoError(t, err)
2120
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

exporter/exporterhelper/queue_sender.go

+13-12
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/exporter"
18-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
1918
"go.opentelemetry.io/collector/exporter/exporterqueue"
2019
"go.opentelemetry.io/collector/exporter/internal/queue"
2120
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
@@ -74,18 +73,18 @@ type queueSender struct {
7473
traceAttribute attribute.KeyValue
7574
consumers *queue.Consumers[Request]
7675

77-
telemetryBuilder *metadata.TelemetryBuilder
78-
exporterID component.ID
76+
obsrep *ObsReport
77+
exporterID component.ID
7978
}
8079

8180
func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numConsumers int,
82-
exportFailureMessage string, telemetryBuilder *metadata.TelemetryBuilder) *queueSender {
81+
exportFailureMessage string, obsrep *ObsReport) *queueSender {
8382
qs := &queueSender{
84-
queue: q,
85-
numConsumers: numConsumers,
86-
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
87-
telemetryBuilder: telemetryBuilder,
88-
exporterID: set.ID,
83+
queue: q,
84+
numConsumers: numConsumers,
85+
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
86+
obsrep: obsrep,
87+
exporterID: set.ID,
8988
}
9089
consumeFunc := func(ctx context.Context, req Request) error {
9190
err := qs.nextSender.send(ctx, req)
@@ -105,10 +104,12 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
105104
return err
106105
}
107106

108-
opts := metric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.exporterID.String())))
107+
dataTypeAttr := attribute.String(obsmetrics.DataTypeKey, qs.obsrep.dataType.String())
109108
return multierr.Append(
110-
qs.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, opts),
111-
qs.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, opts),
109+
qs.obsrep.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) },
110+
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))),
111+
qs.obsrep.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) },
112+
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))),
112113
)
113114
}
114115

exporter/exporterhelper/queue_sender_test.go

+31-22
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,18 @@ import (
1111

1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
14+
"go.opentelemetry.io/otel/attribute"
1415
"go.uber.org/zap"
1516
"go.uber.org/zap/zaptest/observer"
1617

1718
"go.opentelemetry.io/collector/component"
1819
"go.opentelemetry.io/collector/component/componenttest"
1920
"go.opentelemetry.io/collector/config/configretry"
2021
"go.opentelemetry.io/collector/exporter"
21-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
2222
"go.opentelemetry.io/collector/exporter/exporterqueue"
2323
"go.opentelemetry.io/collector/exporter/exportertest"
2424
"go.opentelemetry.io/collector/exporter/internal/queue"
25+
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
2526
)
2627

2728
func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
@@ -202,28 +203,33 @@ func TestQueuedRetryHappyPath(t *testing.T) {
202203
})
203204
}
204205
}
205-
func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
206-
tt, err := componenttest.SetupTelemetry(defaultID)
207-
require.NoError(t, err)
208206

209-
qCfg := NewDefaultQueueSettings()
210-
qCfg.NumConsumers = 0 // to make every request go straight to the queue
211-
rCfg := configretry.NewDefaultBackOffConfig()
212-
set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
213-
be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender,
214-
withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
215-
WithRetry(rCfg), WithQueue(qCfg))
216-
require.NoError(t, err)
217-
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
218-
219-
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))
207+
func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
208+
dataTypes := []component.DataType{component.DataTypeLogs, component.DataTypeTraces, component.DataTypeMetrics}
209+
for _, dataType := range dataTypes {
210+
tt, err := componenttest.SetupTelemetry(defaultID)
211+
require.NoError(t, err)
212+
213+
qCfg := NewDefaultQueueSettings()
214+
qCfg.NumConsumers = 0 // to make every request go straight to the queue
215+
rCfg := configretry.NewDefaultBackOffConfig()
216+
set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
217+
be, err := newBaseExporter(set, dataType, newObservabilityConsumerSender,
218+
withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
219+
WithRetry(rCfg), WithQueue(qCfg))
220+
require.NoError(t, err)
221+
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
222+
223+
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))
224+
225+
for i := 0; i < 7; i++ {
226+
require.NoError(t, be.send(context.Background(), newErrorRequest()))
227+
}
228+
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7),
229+
attribute.String(obsmetrics.DataTypeKey, dataType.String())))
220230

221-
for i := 0; i < 7; i++ {
222-
require.NoError(t, be.send(context.Background(), newErrorRequest()))
231+
assert.NoError(t, be.Shutdown(context.Background()))
223232
}
224-
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7)))
225-
226-
assert.NoError(t, be.Shutdown(context.Background()))
227233
}
228234

229235
func TestNoCancellationContext(t *testing.T) {
@@ -426,9 +432,12 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
426432
func TestQueueSenderNoStartShutdown(t *testing.T) {
427433
queue := queue.NewBoundedMemoryQueue[Request](queue.MemoryQueueSettings[Request]{})
428434
set := exportertest.NewNopSettings()
429-
builder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
435+
obsrep, err := NewObsReport(ObsReportSettings{
436+
ExporterID: exporterID,
437+
ExporterCreateSettings: exportertest.NewNopSettings(),
438+
})
430439
assert.NoError(t, err)
431-
qs := newQueueSender(queue, set, 1, "", builder)
440+
qs := newQueueSender(queue, set, 1, "", obsrep)
432441
assert.NoError(t, qs.Shutdown(context.Background()))
433442
}
434443

internal/obsreportconfig/obsmetrics/obs_exporter.go

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ const (
77
// ExporterKey used to identify exporters in metrics and traces.
88
ExporterKey = "exporter"
99

10+
// DataTypeKey used to identify the data type in the queue size metric.
11+
DataTypeKey = "data_type"
12+
1013
// SentSpansKey used to track spans sent by exporters.
1114
SentSpansKey = "sent_spans"
1215
// FailedToSendSpansKey used to track spans that failed to be sent by exporters.

0 commit comments

Comments
 (0)