Skip to content

Commit b760482

Browse files
Extend receiverhelper scraper functions to simplify use of scrape (scrape metrics slice or resource metrics slice instead of having to return a metrics data object)
1 parent 9ddaf6f commit b760482

File tree

3 files changed

+271
-71
lines changed

3 files changed

+271
-71
lines changed

receiver/receiverhelper/receiver.go

+105-23
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"go.opentelemetry.io/collector/component/componenterror"
2323
"go.opentelemetry.io/collector/config/configmodels"
2424
"go.opentelemetry.io/collector/consumer"
25+
"go.opentelemetry.io/collector/consumer/pdata"
2526
)
2627

2728
// Start specifies the function invoked when the receiver is being started.
@@ -103,15 +104,27 @@ func WithDefaultCollectionInterval(defaultCollectionInterval time.Duration) Metr
103104
}
104105
}
105106

106-
// AddScraper configures the provided scrape function to be called with
107-
// the specified options, and at the specified collection interval (one
108-
// minute by default).
107+
// AddMetricsScraper configures the provided scrape function to be called
108+
// with the specified options, and at the specified collection interval
109+
// (one minute by default).
109110
//
110111
// Observability information will be reported, and the scraped metrics
111112
// will be passed to the next consumer.
112-
func AddScraper(cfg ScraperConfig, scrape Scrape, options ...ScraperOption) MetricOption {
113+
func AddMetricsScraper(cfg ScraperConfig, scrape ScrapeMetrics, options ...ScraperOption) MetricOption {
113114
return func(o *metricsReceiver) {
114-
o.scrapers = append(o.scrapers, newScraper(cfg, scrape, options...))
115+
o.metricScrapers = append(o.metricScrapers, newMetricsScraper(cfg, scrape, options...))
116+
}
117+
}
118+
119+
// AddResourceMetricsScraper configures the provided scrape function to
120+
// be called with the specified options, and at the specified collection
121+
// interval (one minute by default).
122+
//
123+
// Observability information will be reported, and the scraped resource
124+
// metrics will be passed to the next consumer.
125+
func AddResourceMetricsScraper(cfg ScraperConfig, scrape ScrapeResourceMetrics, options ...ScraperOption) MetricOption {
126+
return func(o *metricsReceiver) {
127+
o.resourceMetricScrapers = append(o.resourceMetricScrapers, newResourceMetricsScraper(cfg, scrape, options...))
115128
}
116129
}
117130

@@ -120,8 +133,9 @@ type metricsReceiver struct {
120133
defaultCollectionInterval time.Duration
121134
nextConsumer consumer.MetricsConsumer
122135

123-
scrapers []*scraper
124-
done chan struct{}
136+
metricScrapers []*metricsScraper
137+
resourceMetricScrapers []*resourceMetricsScraper
138+
done chan struct{}
125139
}
126140

127141
// NewMetricReceiver creates a Receiver with the configured options.
@@ -185,7 +199,7 @@ func NewMetricReceiver(config configmodels.Receiver, nextConsumer consumer.Metri
185199

186200
// initializeScrapers initializes all the scrapers
187201
func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error {
188-
for _, scraper := range mr.scrapers {
202+
for _, scraper := range mr.allBaseScrapers() {
189203
if scraper.initialize == nil {
190204
continue
191205
}
@@ -201,14 +215,15 @@ func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error {
201215
// startScraping initiates a ticker that calls Scrape based on the configured
202216
// collection interval.
203217
func (mr *metricsReceiver) startScraping() {
204-
// TODO1: use one ticker for each set of scrapers that have the same collection interval.
205-
// TODO2: consider allowing different "Scrape" functions to be configured, i.e. functions
206-
// that return MetricsSlice or ResourceMetricsSlice (similar to the existing Scraper
207-
// & ResourceScraper interfaces in the host metrics receiver). That will allow data
208-
// from multiple scrapers (that have the same collection interval) to be batched.
209-
210-
for i := 0; i < len(mr.scrapers); i++ {
211-
scraper := mr.scrapers[i]
218+
// TODO: use one ticker for each set of scrapers that have the same collection interval.
219+
220+
mr.startScrapingMetrics()
221+
mr.startScrapingResourceMetrics()
222+
}
223+
224+
func (mr *metricsReceiver) startScrapingMetrics() {
225+
for i := 0; i < len(mr.metricScrapers); i++ {
226+
scraper := mr.metricScrapers[i]
212227
go func() {
213228
collectionInterval := mr.defaultCollectionInterval
214229
if scraper.cfg.CollectionInterval() != 0 {
@@ -221,7 +236,7 @@ func (mr *metricsReceiver) startScraping() {
221236
for {
222237
select {
223238
case <-ticker.C:
224-
mr.scrapeAndReport(context.Background(), scraper)
239+
mr.scrapeMetricsAndReport(context.Background(), scraper)
225240
case <-mr.done:
226241
return
227242
}
@@ -230,16 +245,71 @@ func (mr *metricsReceiver) startScraping() {
230245
}
231246
}
232247

233-
// scrapeAndReport calls the Scrape function of the provided Scraper, records
234-
// observability information, and passes the scraped metrics to the next component.
235-
func (mr *metricsReceiver) scrapeAndReport(ctx context.Context, scraper *scraper) {
248+
// scrapeMetricsAndReport calls the Scrape function of the provided Metrics Scraper,
249+
// records observability information, and passes the scraped metrics to the next
250+
// component.
251+
func (mr *metricsReceiver) scrapeMetricsAndReport(ctx context.Context, ms *metricsScraper) {
236252
// TODO: Add observability metrics support
237-
metrics, err := scraper.scrape(ctx)
253+
metrics, err := ms.scrape(ctx)
238254
if err != nil {
239255
return
240256
}
241257

242-
mr.nextConsumer.ConsumeMetrics(ctx, metrics)
258+
mr.nextConsumer.ConsumeMetrics(ctx, metricSliceToMetricData(metrics))
259+
}
260+
261+
func metricSliceToMetricData(metrics pdata.MetricSlice) pdata.Metrics {
262+
rms := pdata.NewResourceMetricsSlice()
263+
rms.Resize(1)
264+
rm := rms.At(0)
265+
ilms := rm.InstrumentationLibraryMetrics()
266+
ilms.Resize(1)
267+
ilm := ilms.At(0)
268+
metrics.MoveAndAppendTo(ilm.Metrics())
269+
return resourceMetricsSliceToMetricData(rms)
270+
}
271+
272+
func (mr *metricsReceiver) startScrapingResourceMetrics() {
273+
for i := 0; i < len(mr.resourceMetricScrapers); i++ {
274+
scraper := mr.resourceMetricScrapers[i]
275+
go func() {
276+
collectionInterval := mr.defaultCollectionInterval
277+
if scraper.cfg.CollectionInterval() != 0 {
278+
collectionInterval = scraper.cfg.CollectionInterval()
279+
}
280+
281+
ticker := time.NewTicker(collectionInterval)
282+
defer ticker.Stop()
283+
284+
for {
285+
select {
286+
case <-ticker.C:
287+
mr.scrapeResourceMetricsAndReport(context.Background(), scraper)
288+
case <-mr.done:
289+
return
290+
}
291+
}
292+
}()
293+
}
294+
}
295+
296+
// scrapeResourceMetricsAndReport calls the Scrape function of the provided Resource
297+
// Metrics Scrapers, records observability information, and passes the scraped metrics
298+
// to the next component.
299+
func (mr *metricsReceiver) scrapeResourceMetricsAndReport(ctx context.Context, rms *resourceMetricsScraper) {
300+
// TODO: Add observability metrics support
301+
metrics, err := rms.scrape(ctx)
302+
if err != nil {
303+
return
304+
}
305+
306+
mr.nextConsumer.ConsumeMetrics(ctx, resourceMetricsSliceToMetricData(metrics))
307+
}
308+
309+
func resourceMetricsSliceToMetricData(resourceMetrics pdata.ResourceMetricsSlice) pdata.Metrics {
310+
md := pdata.NewMetrics()
311+
resourceMetrics.MoveAndAppendTo(md.ResourceMetrics())
312+
return md
243313
}
244314

245315
// stopScraping stops the ticker
@@ -251,7 +321,7 @@ func (mr *metricsReceiver) stopScraping() {
251321
func (mr *metricsReceiver) closeScrapers(ctx context.Context) error {
252322
var errors []error
253323

254-
for _, scraper := range mr.scrapers {
324+
for _, scraper := range mr.allBaseScrapers() {
255325
if scraper.close == nil {
256326
continue
257327
}
@@ -263,3 +333,15 @@ func (mr *metricsReceiver) closeScrapers(ctx context.Context) error {
263333

264334
return componenterror.CombineErrors(errors)
265335
}
336+
337+
func (mr *metricsReceiver) allBaseScrapers() []baseScraper {
338+
scrapers := make([]baseScraper, len(mr.metricScrapers)+len(mr.resourceMetricScrapers))
339+
for i, ms := range mr.metricScrapers {
340+
scrapers[i] = ms.baseScraper
341+
}
342+
startIdx := len(mr.metricScrapers)
343+
for i, rms := range mr.resourceMetricScrapers {
344+
scrapers[startIdx+i] = rms.baseScraper
345+
}
346+
return scrapers
347+
}

0 commit comments

Comments
 (0)