@@ -125,7 +125,7 @@ func createKafkaClient(config Config) (sarama.ConsumerGroup, error) {
125
125
return sarama .NewConsumerGroup (config .Brokers , config .GroupID , saramaConfig )
126
126
}
127
127
128
- func (c * kafkaTracesConsumer ) Start (_ context.Context , host component.Host ) error {
128
+ func (c * kafkaTracesConsumer ) Start (_ context.Context , _ component.Host ) error {
129
129
ctx , cancel := context .WithCancel (context .Background ())
130
130
c .cancelConsumeLoop = cancel
131
131
obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {
@@ -160,7 +160,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
160
160
}
161
161
go func () {
162
162
if err := c .consumeLoop (ctx , consumerGroup ); err != nil {
163
- host . ReportFatalError ( err )
163
+ c . settings . ReportStatus ( component . NewFatalErrorEvent ( err ) )
164
164
}
165
165
}()
166
166
<- consumerGroup .ready
@@ -209,7 +209,7 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshaler
209
209
}, nil
210
210
}
211
211
212
- func (c * kafkaMetricsConsumer ) Start (_ context.Context , host component.Host ) error {
212
+ func (c * kafkaMetricsConsumer ) Start (_ context.Context , _ component.Host ) error {
213
213
ctx , cancel := context .WithCancel (context .Background ())
214
214
c .cancelConsumeLoop = cancel
215
215
obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {
@@ -244,7 +244,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
244
244
}
245
245
go func () {
246
246
if err := c .consumeLoop (ctx , metricsConsumerGroup ); err != nil {
247
- host . ReportFatalError ( err )
247
+ c . settings . ReportStatus ( component . NewFatalErrorEvent ( err ) )
248
248
}
249
249
}()
250
250
<- metricsConsumerGroup .ready
@@ -293,7 +293,7 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshaler Log
293
293
}, nil
294
294
}
295
295
296
- func (c * kafkaLogsConsumer ) Start (_ context.Context , host component.Host ) error {
296
+ func (c * kafkaLogsConsumer ) Start (_ context.Context , _ component.Host ) error {
297
297
ctx , cancel := context .WithCancel (context .Background ())
298
298
c .cancelConsumeLoop = cancel
299
299
obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {
@@ -328,7 +328,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
328
328
}
329
329
go func () {
330
330
if err := c .consumeLoop (ctx , logsConsumerGroup ); err != nil {
331
- host . ReportFatalError ( err )
331
+ c . settings . ReportStatus ( component . NewFatalErrorEvent ( err ) )
332
332
}
333
333
}()
334
334
<- logsConsumerGroup .ready
0 commit comments