Skip to content

Commit ed9edaa

Browse files
author
Tigran Najaryan
committed
Move ReceiverFactory to "receiver" package.
This is part 1 of 3 of the refactoring plan to eliminate factories.go and move the content to corresponding packages (receiver, exporter, processor).
1 parent a61a358 commit ed9edaa

File tree

27 files changed

+262
-204
lines changed

27 files changed

+262
-204
lines changed

cmd/occollector/app/builder/exporters_builder.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (eb *ExportersBuilder) buildExporter(
170170
// Traces data type is required. Create a trace exporter based on config.
171171
tc, stopFunc, err := factory.CreateTraceExporter(eb.logger, config)
172172
if err != nil {
173-
if err == factories.ErrDataTypeIsNotSupported {
173+
if err == models.ErrDataTypeIsNotSupported {
174174
// Could not create because this exporter does not support this data type.
175175
return nil, typeMismatchErr(config, requirement.requiredBy, models.TracesDataType)
176176
}
@@ -185,7 +185,7 @@ func (eb *ExportersBuilder) buildExporter(
185185
// Metrics data type is required. Create a trace exporter based on config.
186186
mc, stopFunc, err := factory.CreateMetricsExporter(eb.logger, config)
187187
if err != nil {
188-
if err == factories.ErrDataTypeIsNotSupported {
188+
if err == models.ErrDataTypeIsNotSupported {
189189
// Could not create because this exporter does not support this data type.
190190
return nil, typeMismatchErr(config, requirement.requiredBy, models.MetricsDataType)
191191
}

cmd/occollector/app/builder/receivers_builder.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"go.uber.org/zap"
2222

2323
"github.com/open-telemetry/opentelemetry-service/consumer"
24-
"github.com/open-telemetry/opentelemetry-service/factories"
2524
"github.com/open-telemetry/opentelemetry-service/internal"
2625
"github.com/open-telemetry/opentelemetry-service/models"
2726
"github.com/open-telemetry/opentelemetry-service/processor/multiconsumer"
@@ -172,10 +171,10 @@ func (rb *ReceiversBuilder) findPipelinesToAttach(config models.Receiver) (attac
172171
}
173172

174173
func (rb *ReceiversBuilder) attachReceiverToPipelines(
175-
factory factories.ReceiverFactory,
174+
factory receiver.Factory,
176175
dataType models.DataType,
177176
config models.Receiver,
178-
receiver *builtReceiver,
177+
rcv *builtReceiver,
179178
pipelineProcessors []*builtProcessor,
180179
) error {
181180
// There are pipelines of the specified data type that must be attached to
@@ -188,15 +187,15 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines(
188187
junction := buildFanoutTraceConsumer(pipelineProcessors)
189188

190189
// Now create the receiver and tell it to send to the junction point.
191-
receiver.trace, err = factory.CreateTraceReceiver(context.Background(), rb.logger, config, junction)
190+
rcv.trace, err = factory.CreateTraceReceiver(context.Background(), rb.logger, config, junction)
192191

193192
case models.MetricsDataType:
194193
junction := buildFanoutMetricConsumer(pipelineProcessors)
195-
receiver.metrics, err = factory.CreateMetricsReceiver(rb.logger, config, junction)
194+
rcv.metrics, err = factory.CreateMetricsReceiver(rb.logger, config, junction)
196195
}
197196

198197
if err != nil {
199-
if err == factories.ErrDataTypeIsNotSupported {
198+
if err == models.ErrDataTypeIsNotSupported {
200199
return fmt.Errorf(
201200
"receiver %s does not support %s but it was used in a "+
202201
"%s pipeline",
@@ -222,8 +221,8 @@ func (rb *ReceiversBuilder) buildReceiver(config models.Receiver) (*builtReceive
222221
}
223222

224223
// Prepare to build the receiver.
225-
factory := factories.GetReceiverFactory(config.Type())
226-
receiver := &builtReceiver{}
224+
factory := receiver.GetReceiverFactory(config.Type())
225+
rcv := &builtReceiver{}
227226

228227
// Now we have list of pipelines broken down by data type. Iterate for each data type.
229228
for dataType, pipelines := range pipelinesToAttach {
@@ -234,13 +233,13 @@ func (rb *ReceiversBuilder) buildReceiver(config models.Receiver) (*builtReceive
234233

235234
// Attach the corresponding part of the receiver to all pipelines that require
236235
// this data type.
237-
err := rb.attachReceiverToPipelines(factory, dataType, config, receiver, pipelines)
236+
err := rb.attachReceiverToPipelines(factory, dataType, config, rcv, pipelines)
238237
if err != nil {
239238
return nil, err
240239
}
241240
}
242241

243-
return receiver, nil
242+
return rcv, nil
244243
}
245244

246245
func buildFanoutTraceConsumer(pipelineFrontProcessors []*builtProcessor) consumer.TraceConsumer {

configv2/configv2.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/open-telemetry/opentelemetry-service/factories"
2828
"github.com/open-telemetry/opentelemetry-service/models"
29+
"github.com/open-telemetry/opentelemetry-service/receiver"
2930
)
3031

3132
// These are errors that can be returned by Load(). Note that error codes are not part
@@ -181,7 +182,7 @@ func loadReceivers(v *viper.Viper) (models.Receivers, error) {
181182
}
182183

183184
// Find receiver factory based on "type" that we read from config source
184-
factory := factories.GetReceiverFactory(typeStr)
185+
factory := receiver.GetReceiverFactory(typeStr)
185186
if factory == nil {
186187
return nil, &configError{
187188
code: errUnknownReceiverType,

configv2/example_factories.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (f *ExampleReceiverFactory) Type() string {
4949
}
5050

5151
// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this factory.
52-
func (f *ExampleReceiverFactory) CustomUnmarshaler() factories.CustomUnmarshaler {
52+
func (f *ExampleReceiverFactory) CustomUnmarshaler() receiver.CustomUnmarshaler {
5353
return nil
5454
}
5555

@@ -73,7 +73,7 @@ func (f *ExampleReceiverFactory) CreateTraceReceiver(
7373
nextConsumer consumer.TraceConsumer,
7474
) (receiver.TraceReceiver, error) {
7575
if cfg.(*ExampleReceiver).FailTraceCreation {
76-
return nil, factories.ErrDataTypeIsNotSupported
76+
return nil, models.ErrDataTypeIsNotSupported
7777
}
7878
return &ExampleReceiverProducer{TraceConsumer: nextConsumer}, nil
7979
}
@@ -85,7 +85,7 @@ func (f *ExampleReceiverFactory) CreateMetricsReceiver(
8585
nextConsumer consumer.MetricsConsumer,
8686
) (receiver.MetricsReceiver, error) {
8787
if cfg.(*ExampleReceiver).FailMetricsCreation {
88-
return nil, factories.ErrDataTypeIsNotSupported
88+
return nil, models.ErrDataTypeIsNotSupported
8989
}
9090
return &ExampleReceiverProducer{MetricsConsumer: nextConsumer}, nil
9191
}
@@ -181,7 +181,7 @@ func (f *MultiProtoReceiverFactory) Type() string {
181181
}
182182

183183
// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this factory.
184-
func (f *MultiProtoReceiverFactory) CustomUnmarshaler() factories.CustomUnmarshaler {
184+
func (f *MultiProtoReceiverFactory) CustomUnmarshaler() receiver.CustomUnmarshaler {
185185
return nil
186186
}
187187

@@ -311,7 +311,7 @@ func (f *ExampleProcessorFactory) CreateTraceProcessor(
311311
nextConsumer consumer.TraceConsumer,
312312
cfg models.Processor,
313313
) (processor.TraceProcessor, error) {
314-
return nil, factories.ErrDataTypeIsNotSupported
314+
return nil, models.ErrDataTypeIsNotSupported
315315
}
316316

317317
// CreateMetricsProcessor creates a metrics processor based on this config.
@@ -320,13 +320,13 @@ func (f *ExampleProcessorFactory) CreateMetricsProcessor(
320320
nextConsumer consumer.MetricsConsumer,
321321
cfg models.Processor,
322322
) (processor.MetricsProcessor, error) {
323-
return nil, factories.ErrDataTypeIsNotSupported
323+
return nil, models.ErrDataTypeIsNotSupported
324324
}
325325

326326
// RegisterTestFactories registers example factories. This is only used by tests.
327327
func RegisterTestFactories() error {
328-
_ = factories.RegisterReceiverFactory(&ExampleReceiverFactory{})
329-
_ = factories.RegisterReceiverFactory(&MultiProtoReceiverFactory{})
328+
_ = receiver.RegisterReceiverFactory(&ExampleReceiverFactory{})
329+
_ = receiver.RegisterReceiverFactory(&MultiProtoReceiverFactory{})
330330
_ = factories.RegisterExporterFactory(&ExampleExporterFactory{})
331331
_ = factories.RegisterProcessorFactory(&ExampleProcessorFactory{})
332332
return nil

exporter/opencensusexporter/factory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,5 +150,5 @@ func (f *exporterFactory) CreateTraceExporter(logger *zap.Logger, config models.
150150

151151
// CreateMetricsExporter creates a metrics exporter based on this config.
152152
func (f *exporterFactory) CreateMetricsExporter(logger *zap.Logger, cfg models.Exporter) (consumer.MetricsConsumer, factories.StopFunc, error) {
153-
return nil, nil, factories.ErrDataTypeIsNotSupported
153+
return nil, nil, models.ErrDataTypeIsNotSupported
154154
}

exporter/opencensusexporter/factory_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/open-telemetry/opentelemetry-service/factories"
2727
"github.com/open-telemetry/opentelemetry-service/internal/compression"
28+
"github.com/open-telemetry/opentelemetry-service/models"
2829
)
2930

3031
func TestCreateDefaultConfig(t *testing.T) {
@@ -40,7 +41,7 @@ func TestCreateMetricsExporter(t *testing.T) {
4041
cfg := factory.CreateDefaultConfig()
4142

4243
_, _, err := factory.CreateMetricsExporter(zap.NewNop(), cfg)
43-
assert.Error(t, err, factories.ErrDataTypeIsNotSupported)
44+
assert.Error(t, err, models.ErrDataTypeIsNotSupported)
4445
}
4546

4647
func TestCreateTraceExporter(t *testing.T) {

exporter/opencensusexporter/opencensusexporter_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020
"time"
2121

2222
"github.com/google/go-cmp/cmp"
23-
"github.com/open-telemetry/opentelemetry-service/data"
2423
"github.com/spf13/viper"
24+
25+
"github.com/open-telemetry/opentelemetry-service/data"
2526
)
2627

2728
func TestOpenCensusTraceExportersFromViper(t *testing.T) {

factories/factories.go

-60
Original file line numberDiff line numberDiff line change
@@ -15,75 +15,15 @@
1515
package factories
1616

1717
import (
18-
"context"
19-
"errors"
2018
"fmt"
2119

22-
"github.com/spf13/viper"
2320
"go.uber.org/zap"
2421

2522
"github.com/open-telemetry/opentelemetry-service/consumer"
2623
"github.com/open-telemetry/opentelemetry-service/models"
2724
"github.com/open-telemetry/opentelemetry-service/processor"
28-
"github.com/open-telemetry/opentelemetry-service/receiver"
2925
)
3026

31-
///////////////////////////////////////////////////////////////////////////////
32-
// Receiver factory and its registry.
33-
34-
// ReceiverFactory is factory interface for receivers.
35-
type ReceiverFactory interface {
36-
// Type gets the type of the Receiver created by this factory.
37-
Type() string
38-
39-
// CreateDefaultConfig creates the default configuration for the Receiver.
40-
CreateDefaultConfig() models.Receiver
41-
42-
// CustomUnmarshaler returns a custom unmarshaler for the configuration or nil if
43-
// there is no need for custom unmarshaling. This is typically used if viper.Unmarshal()
44-
// is not sufficient to unmarshal correctly.
45-
CustomUnmarshaler() CustomUnmarshaler
46-
47-
// CreateTraceReceiver creates a trace receiver based on this config.
48-
// If the receiver type does not support tracing or if the config is not valid
49-
// error will be returned instead.
50-
CreateTraceReceiver(ctx context.Context, logger *zap.Logger, cfg models.Receiver,
51-
nextConsumer consumer.TraceConsumer) (receiver.TraceReceiver, error)
52-
53-
// CreateMetricsReceiver creates a metrics receiver based on this config.
54-
// If the receiver type does not support metrics or if the config is not valid
55-
// error will be returned instead.
56-
CreateMetricsReceiver(logger *zap.Logger, cfg models.Receiver,
57-
consumer consumer.MetricsConsumer) (receiver.MetricsReceiver, error)
58-
}
59-
60-
// ErrDataTypeIsNotSupported can be returned by CreateTraceReceiver or
61-
// CreateMetricsReceiver if the particular telemetry data type is not supported
62-
// by the receiver.
63-
var ErrDataTypeIsNotSupported = errors.New("telemetry type is not supported")
64-
65-
// CustomUnmarshaler is a function that un-marshals a viper data into a config struct
66-
// in a custom way.
67-
type CustomUnmarshaler func(v *viper.Viper, viperKey string, intoCfg interface{}) error
68-
69-
// List of registered receiver factories.
70-
var receiverFactories = make(map[string]ReceiverFactory)
71-
72-
// RegisterReceiverFactory registers a receiver factory.
73-
func RegisterReceiverFactory(factory ReceiverFactory) error {
74-
if receiverFactories[factory.Type()] != nil {
75-
panic(fmt.Sprintf("duplicate receiver factory %q", factory.Type()))
76-
}
77-
78-
receiverFactories[factory.Type()] = factory
79-
return nil
80-
}
81-
82-
// GetReceiverFactory gets a receiver factory by type string.
83-
func GetReceiverFactory(typeStr string) ReceiverFactory {
84-
return receiverFactories[typeStr]
85-
}
86-
8727
///////////////////////////////////////////////////////////////////////////////
8828
// Exporter factory and its registry.
8929

factories/factories_test.go

+2-71
Original file line numberDiff line numberDiff line change
@@ -15,84 +15,15 @@
1515
package factories
1616

1717
import (
18-
"context"
1918
"testing"
2019

2120
"go.uber.org/zap"
2221

2322
"github.com/open-telemetry/opentelemetry-service/consumer"
2423
"github.com/open-telemetry/opentelemetry-service/models"
2524
"github.com/open-telemetry/opentelemetry-service/processor"
26-
"github.com/open-telemetry/opentelemetry-service/receiver"
2725
)
2826

29-
type ExampleReceiverFactory struct {
30-
}
31-
32-
// Type gets the type of the Receiver config created by this factory.
33-
func (f *ExampleReceiverFactory) Type() string {
34-
return "examplereceiver"
35-
}
36-
37-
// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this factory.
38-
func (f *ExampleReceiverFactory) CustomUnmarshaler() CustomUnmarshaler {
39-
return nil
40-
}
41-
42-
// CreateDefaultConfig creates the default configuration for the Receiver.
43-
func (f *ExampleReceiverFactory) CreateDefaultConfig() models.Receiver {
44-
return nil
45-
}
46-
47-
// CreateTraceReceiver creates a trace receiver based on this config.
48-
func (f *ExampleReceiverFactory) CreateTraceReceiver(
49-
ctx context.Context,
50-
logger *zap.Logger,
51-
cfg models.Receiver,
52-
nextConsumer consumer.TraceConsumer,
53-
) (receiver.TraceReceiver, error) {
54-
// Not used for this test, just return nil
55-
return nil, nil
56-
}
57-
58-
// CreateMetricsReceiver creates a metrics receiver based on this config.
59-
func (f *ExampleReceiverFactory) CreateMetricsReceiver(
60-
logger *zap.Logger,
61-
cfg models.Receiver,
62-
consumer consumer.MetricsConsumer,
63-
) (receiver.MetricsReceiver, error) {
64-
// Not used for this test, just return nil
65-
return nil, nil
66-
}
67-
68-
func TestRegisterReceiverFactory(t *testing.T) {
69-
f := ExampleReceiverFactory{}
70-
err := RegisterReceiverFactory(&f)
71-
if err != nil {
72-
t.Fatalf("cannot register factory")
73-
}
74-
75-
if &f != GetReceiverFactory(f.Type()) {
76-
t.Fatalf("cannot find factory")
77-
}
78-
79-
// Verify that attempt to register a factory with duplicate name panics
80-
panicked := false
81-
func() {
82-
defer func() {
83-
if r := recover(); r != nil {
84-
panicked = true
85-
}
86-
}()
87-
88-
err = RegisterReceiverFactory(&f)
89-
}()
90-
91-
if !panicked {
92-
t.Fatalf("must panic on double registration")
93-
}
94-
}
95-
9627
type ExampleExporterFactory struct {
9728
}
9829

@@ -163,7 +94,7 @@ func (f *ExampleProcessorFactory) CreateTraceProcessor(
16394
nextConsumer consumer.TraceConsumer,
16495
cfg models.Processor,
16596
) (processor.TraceProcessor, error) {
166-
return nil, ErrDataTypeIsNotSupported
97+
return nil, models.ErrDataTypeIsNotSupported
16798
}
16899

169100
// CreateMetricsProcessor creates a metrics processor based on this config.
@@ -172,7 +103,7 @@ func (f *ExampleProcessorFactory) CreateMetricsProcessor(
172103
nextConsumer consumer.MetricsConsumer,
173104
cfg models.Processor,
174105
) (processor.MetricsProcessor, error) {
175-
return nil, ErrDataTypeIsNotSupported
106+
return nil, models.ErrDataTypeIsNotSupported
176107
}
177108

178109
func TestRegisterProcessorFactory(t *testing.T) {

internal/collector/processor/nodebatcher/factory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,5 +101,5 @@ func (f *processorFactory) CreateMetricsProcessor(
101101
nextConsumer consumer.MetricsConsumer,
102102
cfg models.Processor,
103103
) (processor.MetricsProcessor, error) {
104-
return nil, factories.ErrDataTypeIsNotSupported
104+
return nil, models.ErrDataTypeIsNotSupported
105105
}

internal/collector/processor/queued/factory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,5 @@ func (f *processorFactory) CreateMetricsProcessor(
7575
nextConsumer consumer.MetricsConsumer,
7676
cfg models.Processor,
7777
) (processor.MetricsProcessor, error) {
78-
return nil, factories.ErrDataTypeIsNotSupported
78+
return nil, models.ErrDataTypeIsNotSupported
7979
}

0 commit comments

Comments
 (0)