@@ -21,9 +21,9 @@ import (
21
21
"go.uber.org/zap"
22
22
23
23
"github.com/open-telemetry/opentelemetry-service/consumer"
24
+ "github.com/open-telemetry/opentelemetry-service/factories"
24
25
"github.com/open-telemetry/opentelemetry-service/internal"
25
- "github.com/open-telemetry/opentelemetry-service/pkg/configmodels"
26
- "github.com/open-telemetry/opentelemetry-service/pkg/factories"
26
+ "github.com/open-telemetry/opentelemetry-service/models"
27
27
"github.com/open-telemetry/opentelemetry-service/processor/multiconsumer"
28
28
"github.com/open-telemetry/opentelemetry-service/receiver"
29
29
)
@@ -76,7 +76,7 @@ func (rcv *builtReceiver) Start(asyncErrorChan chan<- error) error {
76
76
}
77
77
78
78
// Receivers is a map of receivers created from receiver configs.
79
- type Receivers map [configmodels .Receiver ]* builtReceiver
79
+ type Receivers map [models .Receiver ]* builtReceiver
80
80
81
81
// StopAll stops all receivers.
82
82
func (rcvs Receivers ) StopAll () {
@@ -101,14 +101,14 @@ func (rcvs Receivers) StartAll(logger *zap.Logger, asyncErrorChan chan<- error)
101
101
// ReceiversBuilder builds receivers from config.
102
102
type ReceiversBuilder struct {
103
103
logger * zap.Logger
104
- config * configmodels .ConfigV2
104
+ config * models .ConfigV2
105
105
pipelineProcessors PipelineProcessors
106
106
}
107
107
108
108
// NewReceiversBuilder creates a new ReceiversBuilder. Call Build() on the returned value.
109
109
func NewReceiversBuilder (
110
110
logger * zap.Logger ,
111
- config * configmodels .ConfigV2 ,
111
+ config * models .ConfigV2 ,
112
112
pipelineProcessors PipelineProcessors ,
113
113
) * ReceiversBuilder {
114
114
return & ReceiversBuilder {logger , config , pipelineProcessors }
@@ -131,7 +131,7 @@ func (rb *ReceiversBuilder) Build() (Receivers, error) {
131
131
}
132
132
133
133
// hasReceiver returns true if the pipeline is attached to specified receiver.
134
- func hasReceiver (pipeline * configmodels .Pipeline , receiverName string ) bool {
134
+ func hasReceiver (pipeline * models .Pipeline , receiverName string ) bool {
135
135
for _ , name := range pipeline .Receivers {
136
136
if name == receiverName {
137
137
return true
@@ -140,16 +140,16 @@ func hasReceiver(pipeline *configmodels.Pipeline, receiverName string) bool {
140
140
return false
141
141
}
142
142
143
- type attachedPipelines map [configmodels .DataType ][]* builtProcessor
143
+ type attachedPipelines map [models .DataType ][]* builtProcessor
144
144
145
- func (rb * ReceiversBuilder ) findPipelinesToAttach (config configmodels .Receiver ) (attachedPipelines , error ) {
145
+ func (rb * ReceiversBuilder ) findPipelinesToAttach (config models .Receiver ) (attachedPipelines , error ) {
146
146
// A receiver may be attached to multiple pipelines. Pipelines may consume different
147
147
// data types. We need to compile the list of pipelines of each type that must be
148
148
// attached to this receiver according to configuration.
149
149
150
150
pipelinesToAttach := make (attachedPipelines )
151
- pipelinesToAttach [configmodels .TracesDataType ] = make ([]* builtProcessor , 0 )
152
- pipelinesToAttach [configmodels .MetricsDataType ] = make ([]* builtProcessor , 0 )
151
+ pipelinesToAttach [models .TracesDataType ] = make ([]* builtProcessor , 0 )
152
+ pipelinesToAttach [models .MetricsDataType ] = make ([]* builtProcessor , 0 )
153
153
154
154
// Iterate over all pipelines.
155
155
for _ , pipelineCfg := range rb .config .Pipelines {
@@ -173,8 +173,8 @@ func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver)
173
173
174
174
func (rb * ReceiversBuilder ) attachReceiverToPipelines (
175
175
factory factories.ReceiverFactory ,
176
- dataType configmodels .DataType ,
177
- config configmodels .Receiver ,
176
+ dataType models .DataType ,
177
+ config models .Receiver ,
178
178
receiver * builtReceiver ,
179
179
pipelineProcessors []* builtProcessor ,
180
180
) error {
@@ -183,14 +183,14 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines(
183
183
// sure its output is fanned out to all attached pipelines.
184
184
var err error
185
185
switch dataType {
186
- case configmodels .TracesDataType :
186
+ case models .TracesDataType :
187
187
// First, create the fan out junction point.
188
188
junction := buildFanoutTraceConsumer (pipelineProcessors )
189
189
190
190
// Now create the receiver and tell it to send to the junction point.
191
191
receiver .trace , err = factory .CreateTraceReceiver (context .Background (), config , junction )
192
192
193
- case configmodels .MetricsDataType :
193
+ case models .MetricsDataType :
194
194
junction := buildFanoutMetricConsumer (pipelineProcessors )
195
195
receiver .metrics , err = factory .CreateMetricsReceiver (config , junction )
196
196
}
@@ -213,7 +213,7 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines(
213
213
return nil
214
214
}
215
215
216
- func (rb * ReceiversBuilder ) buildReceiver (config configmodels .Receiver ) (* builtReceiver , error ) {
216
+ func (rb * ReceiversBuilder ) buildReceiver (config models .Receiver ) (* builtReceiver , error ) {
217
217
218
218
// First find pipelines that must be attached to this receiver.
219
219
pipelinesToAttach , err := rb .findPipelinesToAttach (config )
0 commit comments