Skip to content

Commit d8901b5

Browse files
Add receiverhelper functions for creating metrics receiver, optionally adding scrape targets
1 parent dab0c56 commit d8901b5

File tree

3 files changed

+682
-0
lines changed

3 files changed

+682
-0
lines changed

receiver/receiverhelper/receiver.go

+265
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiverhelper
16+
17+
import (
18+
"context"
19+
"time"
20+
21+
"go.opentelemetry.io/collector/component"
22+
"go.opentelemetry.io/collector/component/componenterror"
23+
"go.opentelemetry.io/collector/config/configmodels"
24+
"go.opentelemetry.io/collector/consumer"
25+
)
26+
27+
// Start specifies the function invoked when the receiver is being started.
28+
type Start func(context.Context, component.Host) error
29+
30+
// Shutdown specifies the function invoked when the receiver is being shutdown.
31+
type Shutdown func(context.Context) error
32+
33+
// Option apply changes to internal options.
34+
type Option func(*baseReceiver)
35+
36+
// WithStart overrides the default Start function for a receiver.
37+
// The default shutdown function does nothing and always returns nil.
38+
func WithStart(start Start) Option {
39+
return func(o *baseReceiver) {
40+
o.start = start
41+
}
42+
}
43+
44+
// WithShutdown overrides the default Shutdown function for a receiver.
45+
// The default shutdown function does nothing and always returns nil.
46+
func WithShutdown(shutdown Shutdown) Option {
47+
return func(o *baseReceiver) {
48+
o.shutdown = shutdown
49+
}
50+
}
51+
52+
type baseReceiver struct {
53+
fullName string
54+
start Start
55+
shutdown Shutdown
56+
}
57+
58+
// Construct the internalOptions from multiple Option.
59+
func newBaseReceiver(fullName string, options ...Option) baseReceiver {
60+
br := baseReceiver{fullName: fullName}
61+
62+
for _, op := range options {
63+
op(&br)
64+
}
65+
66+
return br
67+
}
68+
69+
// Start the receiver, invoked during service start.
70+
func (br *baseReceiver) Start(ctx context.Context, host component.Host) error {
71+
if br.start != nil {
72+
return br.start(ctx, host)
73+
}
74+
return nil
75+
}
76+
77+
// Shutdown the receiver, invoked during service shutdown.
78+
func (br *baseReceiver) Shutdown(ctx context.Context) error {
79+
if br.shutdown != nil {
80+
return br.shutdown(ctx)
81+
}
82+
return nil
83+
}
84+
85+
// MetricOption apply changes to internal options.
86+
type MetricOption func(*metricsReceiver)
87+
88+
// WithBaseOptions applies any base options to a metrics receiver.
89+
func WithBaseOptions(options ...Option) MetricOption {
90+
return func(o *metricsReceiver) {
91+
for _, option := range options {
92+
option(&o.baseReceiver)
93+
}
94+
}
95+
}
96+
97+
// WithDefaultCollectionInterval overrides the default collection
98+
// interval (1 minute) that will be applied to all scrapers if not
99+
// overridden by the individual scraper.
100+
func WithDefaultCollectionInterval(defaultCollectionInterval time.Duration) MetricOption {
101+
return func(o *metricsReceiver) {
102+
o.defaultCollectionInterval = defaultCollectionInterval
103+
}
104+
}
105+
106+
// AddScraper configures the provided scrape function to be called with
107+
// the specified options, and at the specified collection interval (one
108+
// minute by default).
109+
//
110+
// Observability information will be reported, and the scraped metrics
111+
// will be passed to the next consumer.
112+
func AddScraper(cfg ScraperConfig, scrape Scrape, options ...ScraperOption) MetricOption {
113+
return func(o *metricsReceiver) {
114+
o.scrapers = append(o.scrapers, newScraper(cfg, scrape, options...))
115+
}
116+
}
117+
118+
type metricsReceiver struct {
119+
baseReceiver
120+
defaultCollectionInterval time.Duration
121+
nextConsumer consumer.MetricsConsumer
122+
123+
scrapers []*scraper
124+
done chan struct{}
125+
}
126+
127+
// NewMetricReceiver creates a Receiver with the configured options.
128+
func NewMetricReceiver(config configmodels.Receiver, nextConsumer consumer.MetricsConsumer, options ...MetricOption) (component.Receiver, error) {
129+
if nextConsumer == nil {
130+
return nil, componenterror.ErrNilNextConsumer
131+
}
132+
133+
mr := &metricsReceiver{
134+
baseReceiver: newBaseReceiver(config.Name()),
135+
defaultCollectionInterval: time.Minute,
136+
nextConsumer: nextConsumer,
137+
done: make(chan struct{}),
138+
}
139+
140+
for _, op := range options {
141+
op(mr)
142+
}
143+
144+
// wrap the start function with a call to initialize scrapers
145+
// and start scraping
146+
start := mr.start
147+
mr.start = func(ctx context.Context, host component.Host) error {
148+
if start != nil {
149+
if err := start(ctx, host); err != nil {
150+
return err
151+
}
152+
}
153+
154+
if err := mr.initializeScrapers(ctx); err != nil {
155+
return err
156+
}
157+
158+
mr.startScraping()
159+
return nil
160+
}
161+
162+
// wrap the shutdown function with a call to close scrapers
163+
// and stop scraping
164+
shutdown := mr.shutdown
165+
mr.shutdown = func(ctx context.Context) error {
166+
mr.stopScraping()
167+
168+
var errors []error
169+
170+
if err := mr.closeScrapers(ctx); err != nil {
171+
errors = append(errors, err)
172+
}
173+
174+
if shutdown != nil {
175+
if err := shutdown(ctx); err != nil {
176+
errors = append(errors, err)
177+
}
178+
}
179+
180+
return componenterror.CombineErrors(errors)
181+
}
182+
183+
return mr, nil
184+
}
185+
186+
// initializeScrapers initializes all the scrapers
187+
func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error {
188+
for _, scraper := range mr.scrapers {
189+
if scraper.initialize == nil {
190+
continue
191+
}
192+
193+
if err := scraper.initialize(ctx); err != nil {
194+
return err
195+
}
196+
}
197+
198+
return nil
199+
}
200+
201+
// startScraping initiates a ticker that calls Scrape based on the configured
202+
// collection interval.
203+
func (mr *metricsReceiver) startScraping() {
204+
// TODO1: use one ticker for each set of scrapers that have the same collection interval.
205+
// TODO2: consider allowing different "Scrape" functions to be configured, i.e. functions
206+
// that return MetricsSlice or ResourceMetricsSlice (similar to the existing Scraper
207+
// & ResourceScraper interfaces in the host metrics receiver). That will allow data
208+
// from multiple scrapers (that have the same collection interval) to be batched.
209+
210+
for i := 0; i < len(mr.scrapers); i++ {
211+
scraper := mr.scrapers[i]
212+
go func() {
213+
collectionInterval := mr.defaultCollectionInterval
214+
if scraper.cfg.CollectionInterval() != 0 {
215+
collectionInterval = scraper.cfg.CollectionInterval()
216+
}
217+
218+
ticker := time.NewTicker(collectionInterval)
219+
defer ticker.Stop()
220+
221+
for {
222+
select {
223+
case <-ticker.C:
224+
mr.scrapeAndReport(context.Background(), scraper)
225+
case <-mr.done:
226+
return
227+
}
228+
}
229+
}()
230+
}
231+
}
232+
233+
// scrapeAndReport calls the Scrape function of the provided Scraper, records
234+
// observability information, and passes the scraped metrics to the next component.
235+
func (mr *metricsReceiver) scrapeAndReport(ctx context.Context, scraper *scraper) {
236+
// TODO: Add observability metrics support
237+
metrics, err := scraper.scrape(ctx)
238+
if err != nil {
239+
return
240+
}
241+
242+
mr.nextConsumer.ConsumeMetrics(ctx, metrics)
243+
}
244+
245+
// stopScraping stops the ticker
246+
func (mr *metricsReceiver) stopScraping() {
247+
close(mr.done)
248+
}
249+
250+
// closeScrapers closes all the scrapers
251+
func (mr *metricsReceiver) closeScrapers(ctx context.Context) error {
252+
var errors []error
253+
254+
for _, scraper := range mr.scrapers {
255+
if scraper.close == nil {
256+
continue
257+
}
258+
259+
if err := scraper.close(ctx); err != nil {
260+
errors = append(errors, err)
261+
}
262+
}
263+
264+
return componenterror.CombineErrors(errors)
265+
}

0 commit comments

Comments
 (0)