Skip to content

Commit 52abb90

Browse files
authored
Use consumerhelper for exporterhelper, add WithCapabilities (#3186)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent a96e010 commit 52abb90

File tree

9 files changed

+135
-91
lines changed

9 files changed

+135
-91
lines changed

exporter/exporterhelper/common.go

+17-10
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"go.opentelemetry.io/collector/component"
2424
"go.opentelemetry.io/collector/component/componenthelper"
2525
"go.opentelemetry.io/collector/config"
26+
"go.opentelemetry.io/collector/consumer"
27+
"go.opentelemetry.io/collector/consumer/consumerhelper"
2628
)
2729

2830
// TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend.
@@ -73,14 +75,15 @@ func (req *baseRequest) setContext(ctx context.Context) {
7375
// baseSettings represents all the options that users can configure.
7476
type baseSettings struct {
7577
componentOptions []componenthelper.Option
78+
consumerOptions []consumerhelper.Option
7679
TimeoutSettings
7780
QueueSettings
7881
RetrySettings
7982
ResourceToTelemetrySettings
8083
}
8184

8285
// fromOptions returns the internal options starting from the default and applying all configured options.
83-
func fromOptions(options []Option) *baseSettings {
86+
func fromOptions(options ...Option) *baseSettings {
8487
// Start from the default options:
8588
opts := &baseSettings{
8689
TimeoutSettings: DefaultTimeoutSettings(),
@@ -141,6 +144,15 @@ func WithQueue(queueSettings QueueSettings) Option {
141144
}
142145
}
143146

147+
// WithCapabilities overrides the default Capabilities() function for a Consumer.
148+
// The default is non-mutable data.
149+
// TODO: Verify if we can change the default to be mutable as we do for processors.
150+
func WithCapabilities(capabilities consumer.Capabilities) Option {
151+
return func(o *baseSettings) {
152+
o.consumerOptions = append(o.consumerOptions, consumerhelper.WithCapabilities(capabilities))
153+
}
154+
}
155+
144156
// WithResourceToTelemetryConversion overrides the default ResourceToTelemetrySettings for an exporter.
145157
// The default ResourceToTelemetrySettings is to disable resource attributes to metric labels conversion.
146158
func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) Option {
@@ -152,18 +164,13 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel
152164
// baseExporter contains common fields between different exporter types.
153165
type baseExporter struct {
154166
component.Component
155-
cfg config.Exporter
156-
sender requestSender
157-
qrSender *queuedRetrySender
158-
convertResourceToTelemetry bool
167+
sender requestSender
168+
qrSender *queuedRetrySender
159169
}
160170

161-
func newBaseExporter(cfg config.Exporter, logger *zap.Logger, options ...Option) *baseExporter {
162-
bs := fromOptions(options)
171+
func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings) *baseExporter {
163172
be := &baseExporter{
164-
Component: componenthelper.New(bs.componentOptions...),
165-
cfg: cfg,
166-
convertResourceToTelemetry: bs.ResourceToTelemetrySettings.Enabled,
173+
Component: componenthelper.New(bs.componentOptions...),
167174
}
168175

169176
be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)

exporter/exporterhelper/common_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestErrorToStatus(t *testing.T) {
4444
}
4545

4646
func TestBaseExporter(t *testing.T) {
47-
be := newBaseExporter(&defaultExporterCfg, zap.NewNop())
47+
be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions())
4848
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
4949
require.NoError(t, be.Shutdown(context.Background()))
5050
}
@@ -54,10 +54,11 @@ func TestBaseExporterWithOptions(t *testing.T) {
5454
be := newBaseExporter(
5555
&defaultExporterCfg,
5656
zap.NewNop(),
57-
WithStart(func(ctx context.Context, host component.Host) error { return want }),
58-
WithShutdown(func(ctx context.Context) error { return want }),
59-
WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()),
60-
WithTimeout(DefaultTimeoutSettings()),
57+
fromOptions(
58+
WithStart(func(ctx context.Context, host component.Host) error { return want }),
59+
WithShutdown(func(ctx context.Context) error { return want }),
60+
WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()),
61+
WithTimeout(DefaultTimeoutSettings())),
6162
)
6263
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
6364
require.Equal(t, want, be.Shutdown(context.Background()))

exporter/exporterhelper/logshelper.go renamed to exporter/exporterhelper/logs.go

+9-12
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,7 @@ func (req *logsRequest) count() int {
6161

6262
type logsExporter struct {
6363
*baseExporter
64-
pusher consumerhelper.ConsumeLogsFunc
65-
}
66-
67-
func (lexp *logsExporter) Capabilities() consumer.Capabilities {
68-
return consumer.Capabilities{MutatesData: false}
69-
}
70-
71-
func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
72-
return lexp.sender.send(newLogsRequest(ctx, ld, lexp.pusher))
64+
consumer.Logs
7365
}
7466

7567
// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span.
@@ -91,7 +83,8 @@ func NewLogsExporter(
9183
return nil, errNilPushLogsData
9284
}
9385

94-
be := newBaseExporter(cfg, logger, options...)
86+
bs := fromOptions(options...)
87+
be := newBaseExporter(cfg, logger, bs)
9588
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
9689
return &logsExporterWithObservability{
9790
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
@@ -102,10 +95,14 @@ func NewLogsExporter(
10295
}
10396
})
10497

98+
lc, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error {
99+
return be.sender.send(newLogsRequest(ctx, ld, pusher))
100+
}, bs.consumerOptions...)
101+
105102
return &logsExporter{
106103
baseExporter: be,
107-
pusher: pusher,
108-
}, nil
104+
Logs: lc,
105+
}, err
109106
}
110107

111108
type logsExporterWithObservability struct {

exporter/exporterhelper/logshelper_test.go renamed to exporter/exporterhelper/logs_test.go

+17-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ import (
2424
"go.uber.org/zap"
2525

2626
"go.opentelemetry.io/collector/component"
27+
"go.opentelemetry.io/collector/component/componenttest"
2728
"go.opentelemetry.io/collector/config"
29+
"go.opentelemetry.io/collector/consumer"
2830
"go.opentelemetry.io/collector/consumer/consumererror"
2931
"go.opentelemetry.io/collector/consumer/consumerhelper"
3032
"go.opentelemetry.io/collector/consumer/pdata"
@@ -78,22 +80,33 @@ func TestLogsExporter_Default(t *testing.T) {
7880
assert.NotNil(t, le)
7981
assert.NoError(t, err)
8082

81-
assert.Nil(t, le.ConsumeLogs(context.Background(), ld))
82-
assert.Nil(t, le.Shutdown(context.Background()))
83+
assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities())
84+
assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost()))
85+
assert.NoError(t, le.ConsumeLogs(context.Background(), ld))
86+
assert.NoError(t, le.Shutdown(context.Background()))
87+
}
88+
89+
func TestLogsExporter_WithCapabilities(t *testing.T) {
90+
capabilities := consumer.Capabilities{MutatesData: true}
91+
le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil), WithCapabilities(capabilities))
92+
require.NoError(t, err)
93+
require.NotNil(t, le)
94+
95+
assert.Equal(t, capabilities, le.Capabilities())
8396
}
8497

8598
func TestLogsExporter_Default_ReturnError(t *testing.T) {
8699
ld := testdata.GenerateLogDataEmpty()
87100
want := errors.New("my_error")
88101
le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(want))
89-
require.Nil(t, err)
102+
require.NoError(t, err)
90103
require.NotNil(t, le)
91104
require.Equal(t, want, le.ConsumeLogs(context.Background(), ld))
92105
}
93106

94107
func TestLogsExporter_WithRecordLogs(t *testing.T) {
95108
le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil))
96-
require.Nil(t, err)
109+
require.NoError(t, err)
97110
require.NotNil(t, le)
98111

99112
checkRecordedMetricsForLogsExporter(t, le, nil)

exporter/exporterhelper/metricshelper.go renamed to exporter/exporterhelper/metrics.go

+12-15
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,7 @@ func (req *metricsRequest) count() int {
6262

6363
type metricsExporter struct {
6464
*baseExporter
65-
pusher consumerhelper.ConsumeMetricsFunc
66-
}
67-
68-
func (mexp *metricsExporter) Capabilities() consumer.Capabilities {
69-
return consumer.Capabilities{MutatesData: false}
70-
}
71-
72-
func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
73-
if mexp.baseExporter.convertResourceToTelemetry {
74-
md = convertResourceToLabels(md)
75-
}
76-
return mexp.sender.send(newMetricsRequest(ctx, md, mexp.pusher))
65+
consumer.Metrics
7766
}
7867

7968
// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span.
@@ -95,7 +84,8 @@ func NewMetricsExporter(
9584
return nil, errNilPushMetricsData
9685
}
9786

98-
be := newBaseExporter(cfg, logger, options...)
87+
bs := fromOptions(options...)
88+
be := newBaseExporter(cfg, logger, bs)
9989
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
10090
return &metricsSenderWithObservability{
10191
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
@@ -106,10 +96,17 @@ func NewMetricsExporter(
10696
}
10797
})
10898

99+
mc, err := consumerhelper.NewMetrics(func(ctx context.Context, md pdata.Metrics) error {
100+
if bs.ResourceToTelemetrySettings.Enabled {
101+
md = convertResourceToLabels(md)
102+
}
103+
return be.sender.send(newMetricsRequest(ctx, md, pusher))
104+
}, bs.consumerOptions...)
105+
109106
return &metricsExporter{
110107
baseExporter: be,
111-
pusher: pusher,
112-
}, nil
108+
Metrics: mc,
109+
}, err
113110
}
114111

115112
type metricsSenderWithObservability struct {

exporter/exporterhelper/metricshelper_test.go renamed to exporter/exporterhelper/metrics_test.go

+31-14
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ import (
2424
"go.uber.org/zap"
2525

2626
"go.opentelemetry.io/collector/component"
27+
"go.opentelemetry.io/collector/component/componenttest"
2728
"go.opentelemetry.io/collector/config"
29+
"go.opentelemetry.io/collector/consumer"
2830
"go.opentelemetry.io/collector/consumer/consumererror"
2931
"go.opentelemetry.io/collector/consumer/consumerhelper"
3032
"go.opentelemetry.io/collector/consumer/pdata"
@@ -74,25 +76,36 @@ func TestMetricsExporter_NilPushMetricsData(t *testing.T) {
7476
func TestMetricsExporter_Default(t *testing.T) {
7577
md := testdata.GenerateMetricsEmpty()
7678
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
79+
assert.NoError(t, err)
7780
assert.NotNil(t, me)
81+
82+
assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities())
83+
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
84+
assert.NoError(t, me.ConsumeMetrics(context.Background(), md))
85+
assert.NoError(t, me.Shutdown(context.Background()))
86+
}
87+
88+
func TestMetricsExporter_WithCapabilities(t *testing.T) {
89+
capabilities := consumer.Capabilities{MutatesData: true}
90+
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithCapabilities(capabilities))
7891
assert.NoError(t, err)
92+
assert.NotNil(t, me)
7993

80-
assert.Nil(t, me.ConsumeMetrics(context.Background(), md))
81-
assert.Nil(t, me.Shutdown(context.Background()))
94+
assert.Equal(t, capabilities, me.Capabilities())
8295
}
8396

8497
func TestMetricsExporter_Default_ReturnError(t *testing.T) {
8598
md := testdata.GenerateMetricsEmpty()
8699
want := errors.New("my_error")
87100
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want))
88-
require.Nil(t, err)
101+
require.NoError(t, err)
89102
require.NotNil(t, me)
90103
require.Equal(t, want, me.ConsumeMetrics(context.Background(), md))
91104
}
92105

93106
func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
94107
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
95-
require.Nil(t, err)
108+
require.NoError(t, err)
96109
require.NotNil(t, me)
97110

98111
checkRecordedMetricsForMetricsExporter(t, me, nil)
@@ -101,23 +114,23 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
101114
func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
102115
want := errors.New("my_error")
103116
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want))
104-
require.Nil(t, err)
117+
require.NoError(t, err)
105118
require.NotNil(t, me)
106119

107120
checkRecordedMetricsForMetricsExporter(t, me, want)
108121
}
109122

110123
func TestMetricsExporter_WithSpan(t *testing.T) {
111124
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
112-
require.Nil(t, err)
125+
require.NoError(t, err)
113126
require.NotNil(t, me)
114127
checkWrapSpanForMetricsExporter(t, me, nil, 1)
115128
}
116129

117130
func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) {
118131
want := errors.New("my_error")
119132
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want))
120-
require.Nil(t, err)
133+
require.NoError(t, err)
121134
require.NotNil(t, me)
122135
checkWrapSpanForMetricsExporter(t, me, want, 1)
123136
}
@@ -130,7 +143,8 @@ func TestMetricsExporter_WithShutdown(t *testing.T) {
130143
assert.NotNil(t, me)
131144
assert.NoError(t, err)
132145

133-
assert.Nil(t, me.Shutdown(context.Background()))
146+
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
147+
assert.NoError(t, me.Shutdown(context.Background()))
134148
assert.True(t, shutdownCalled)
135149
}
136150

@@ -140,18 +154,20 @@ func TestMetricsExporter_WithResourceToTelemetryConversionDisabled(t *testing.T)
140154
assert.NotNil(t, me)
141155
assert.NoError(t, err)
142156

143-
assert.Nil(t, me.ConsumeMetrics(context.Background(), md))
144-
assert.Nil(t, me.Shutdown(context.Background()))
157+
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
158+
assert.NoError(t, me.ConsumeMetrics(context.Background(), md))
159+
assert.NoError(t, me.Shutdown(context.Background()))
145160
}
146161

147-
func TestMetricsExporter_WithResourceToTelemetryConversionEbabled(t *testing.T) {
162+
func TestMetricsExporter_WithResourceToTelemetryConversionEnabled(t *testing.T) {
148163
md := testdata.GenerateMetricsTwoMetrics()
149164
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithResourceToTelemetryConversion(ResourceToTelemetrySettings{Enabled: true}))
150165
assert.NotNil(t, me)
151166
assert.NoError(t, err)
152167

153-
assert.Nil(t, me.ConsumeMetrics(context.Background(), md))
154-
assert.Nil(t, me.Shutdown(context.Background()))
168+
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
169+
assert.NoError(t, me.ConsumeMetrics(context.Background(), md))
170+
assert.NoError(t, me.Shutdown(context.Background()))
155171
}
156172

157173
func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) {
@@ -162,7 +178,8 @@ func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) {
162178
assert.NotNil(t, me)
163179
assert.NoError(t, err)
164180

165-
assert.Equal(t, me.Shutdown(context.Background()), want)
181+
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
182+
assert.Equal(t, want, me.Shutdown(context.Background()))
166183
}
167184

168185
func newPushMetricsData(retError error) consumerhelper.ConsumeMetricsFunc {

0 commit comments

Comments
 (0)