Skip to content

Commit a6816f0

Browse files
Add receiverhelper functions for creating a simple scraper receiver
1 parent 4a45ae9 commit a6816f0

File tree

4 files changed

+345
-0
lines changed

4 files changed

+345
-0
lines changed

receiver/receiverhelper/receiver.go

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiverhelper
16+
17+
import (
18+
"context"
19+
20+
"go.opentelemetry.io/collector/component"
21+
)
22+
23+
// Start specifies the function invoked when the receiver is being started.
24+
type Start func(context.Context, component.Host) error
25+
26+
// Shutdown specifies the function invoked when the receiver is being shutdown.
27+
type Shutdown func(context.Context) error
28+
29+
// Option apply changes to internal options.
30+
type Option func(*baseReceiver)
31+
32+
// WithStart overrides the default Start function for a receiver.
33+
// The default shutdown function does nothing and always returns nil.
34+
func WithStart(start Start) Option {
35+
return func(o *baseReceiver) {
36+
o.start = start
37+
}
38+
}
39+
40+
// WithShutdown overrides the default Shutdown function for a receiver.
41+
// The default shutdown function does nothing and always returns nil.
42+
func WithShutdown(shutdown Shutdown) Option {
43+
return func(o *baseReceiver) {
44+
o.shutdown = shutdown
45+
}
46+
}
47+
48+
type baseReceiver struct {
49+
fullName string
50+
start Start
51+
shutdown Shutdown
52+
}
53+
54+
// Construct the internalOptions from multiple Option.
55+
func newBaseReceiver(fullName string, options ...Option) baseReceiver {
56+
br := baseReceiver{fullName: fullName}
57+
58+
for _, op := range options {
59+
op(&br)
60+
}
61+
62+
return br
63+
}
64+
65+
// Start the receiver, invoked during service start.
66+
func (br *baseReceiver) Start(ctx context.Context, host component.Host) error {
67+
if br.start != nil {
68+
return br.start(ctx, host)
69+
}
70+
return nil
71+
}
72+
73+
// Shutdown the receiver, invoked during service shutdown.
74+
func (br *baseReceiver) Shutdown(ctx context.Context) error {
75+
if br.shutdown != nil {
76+
return br.shutdown(ctx)
77+
}
78+
return nil
79+
}
+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiverhelper
16+
17+
import (
18+
"context"
19+
"errors"
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
24+
"go.opentelemetry.io/collector/component"
25+
"go.opentelemetry.io/collector/component/componenttest"
26+
)
27+
28+
const testFullName = "testFullName"
29+
30+
func TestWithStart(t *testing.T) {
31+
startCalled := false
32+
start := func(context.Context, component.Host) error { startCalled = true; return nil }
33+
34+
bp := newBaseReceiver(testFullName, WithStart(start))
35+
assert.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
36+
assert.True(t, startCalled)
37+
}
38+
39+
func TestWithStart_ReturnError(t *testing.T) {
40+
want := errors.New("my_error")
41+
start := func(context.Context, component.Host) error { return want }
42+
43+
bp := newBaseReceiver(testFullName, WithStart(start))
44+
assert.Equal(t, want, bp.Start(context.Background(), componenttest.NewNopHost()))
45+
}
46+
47+
func TestWithShutdown(t *testing.T) {
48+
shutdownCalled := false
49+
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
50+
51+
bp := newBaseReceiver(testFullName, WithShutdown(shutdown))
52+
assert.NoError(t, bp.Shutdown(context.Background()))
53+
assert.True(t, shutdownCalled)
54+
}
55+
56+
func TestWithShutdown_ReturnError(t *testing.T) {
57+
want := errors.New("my_error")
58+
shutdownErr := func(context.Context) error { return want }
59+
60+
bp := newBaseReceiver(testFullName, WithShutdown(shutdownErr))
61+
assert.Equal(t, want, bp.Shutdown(context.Background()))
62+
}

receiver/receiverhelper/scraper.go

+130
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiverhelper
16+
17+
import (
18+
"context"
19+
"time"
20+
21+
"go.opentelemetry.io/collector/component"
22+
"go.opentelemetry.io/collector/component/componenterror"
23+
"go.opentelemetry.io/collector/config/configmodels"
24+
"go.opentelemetry.io/collector/consumer"
25+
"go.opentelemetry.io/collector/consumer/pdata"
26+
)
27+
28+
// Scraper provides a function to scrape metrics.
29+
type Scraper interface {
30+
Scrape(context.Context) (pdata.Metrics, error)
31+
}
32+
33+
// Scrape specifies the function that will be invoked every collection interval when
34+
// the receiver is configured as a Scraper.
35+
type Scrape func(context.Context) (pdata.Metrics, error)
36+
37+
// ScraperOption apply changes to internal scraper options.
38+
type ScraperOption func(*baseScraper)
39+
40+
type baseScraper struct {
41+
baseReceiver
42+
collectionInterval time.Duration
43+
scraper Scraper
44+
nextConsumer consumer.MetricsConsumer
45+
done chan struct{}
46+
}
47+
48+
// NewScraper creates a MetricsReceiver that calls Scrape at the specified collection
49+
// interval, reports observability information, and passes the scraped metrics to the
50+
// next consumer.
51+
func NewScraper(
52+
config configmodels.Receiver,
53+
collectionInterval time.Duration,
54+
scraper Scraper,
55+
nextConsumer consumer.MetricsConsumer,
56+
options ...Option,
57+
) (component.Receiver, error) {
58+
if nextConsumer == nil {
59+
return nil, componenterror.ErrNilNextConsumer
60+
}
61+
62+
bs := &baseScraper{
63+
baseReceiver: newBaseReceiver(config.Name(), options...),
64+
collectionInterval: collectionInterval,
65+
scraper: scraper,
66+
nextConsumer: nextConsumer,
67+
done: make(chan struct{}),
68+
}
69+
70+
// wrap the start function with a call to start scraping
71+
start := bs.start
72+
bs.start = func(ctx context.Context, host component.Host) error {
73+
if start != nil {
74+
if err := start(ctx, host); err != nil {
75+
return err
76+
}
77+
}
78+
79+
bs.startScraping()
80+
return nil
81+
}
82+
83+
// wrap the shutdown function with a call to stop scraping
84+
shutdown := bs.shutdown
85+
bs.shutdown = func(ctx context.Context) error {
86+
bs.stopScraping()
87+
88+
if shutdown != nil {
89+
shutdown(ctx)
90+
}
91+
return nil
92+
}
93+
94+
return bs, nil
95+
}
96+
97+
// startScraping initiates a ticker that calls Scrape based on the configured
98+
// collection interval.
99+
func (bs *baseScraper) startScraping() {
100+
go func() {
101+
ticker := time.NewTicker(bs.collectionInterval)
102+
defer ticker.Stop()
103+
104+
for {
105+
select {
106+
case <-ticker.C:
107+
bs.scrapeAndReport(context.Background())
108+
case <-bs.done:
109+
return
110+
}
111+
}
112+
}()
113+
}
114+
115+
// scrapeAndReport calls the Scrape function of the provided Scraper, records
116+
// observability information, and passes the scraped metrics to the next component.
117+
// TODO: Add observability metrics support
118+
func (bs *baseScraper) scrapeAndReport(ctx context.Context) {
119+
metrics, err := bs.scraper.Scrape(ctx)
120+
if err != nil {
121+
return
122+
}
123+
124+
bs.nextConsumer.ConsumeMetrics(ctx, metrics)
125+
}
126+
127+
// stopScraping stops the ticker
128+
func (bs *baseScraper) stopScraping() {
129+
close(bs.done)
130+
}
+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiverhelper
16+
17+
import (
18+
"context"
19+
"errors"
20+
"testing"
21+
"time"
22+
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
26+
"go.opentelemetry.io/collector/component/componenttest"
27+
"go.opentelemetry.io/collector/config/configmodels"
28+
"go.opentelemetry.io/collector/consumer/pdata"
29+
"go.opentelemetry.io/collector/exporter/exportertest"
30+
)
31+
32+
var testCfg = &configmodels.ReceiverSettings{
33+
TypeVal: testFullName,
34+
NameVal: testFullName,
35+
}
36+
37+
func TestNewScraper(t *testing.T) {
38+
ch := make(chan int, 10)
39+
ts, err := NewScraper(testCfg, time.Microsecond, &testScraper{ch: ch}, exportertest.NewNopMetricsExporter())
40+
require.NoError(t, err)
41+
42+
assert.NoError(t, ts.Start(context.Background(), componenttest.NewNopHost()))
43+
require.Eventually(t, func() bool { return (<-ch) > 5 }, 100*time.Millisecond, time.Millisecond)
44+
assert.NoError(t, ts.Shutdown(context.Background()))
45+
}
46+
47+
type testScraper struct {
48+
ch chan int
49+
timesScrapeCalled int
50+
}
51+
52+
func (ts *testScraper) Scrape(ctx context.Context) (pdata.Metrics, error) {
53+
ts.timesScrapeCalled++
54+
ts.ch <- ts.timesScrapeCalled
55+
return pdata.NewMetrics(), nil
56+
}
57+
58+
func TestNewScraper_ScrapeError(t *testing.T) {
59+
want := errors.New("my_error")
60+
ts, err := NewScraper(testCfg, time.Millisecond, &testScraperError{err: want}, exportertest.NewNopMetricsExporter())
61+
require.NoError(t, err)
62+
63+
assert.NoError(t, ts.Start(context.Background(), componenttest.NewNopHost()))
64+
// TODO validate observability data populated correctly
65+
assert.NoError(t, ts.Shutdown(context.Background()))
66+
}
67+
68+
type testScraperError struct {
69+
err error
70+
}
71+
72+
func (ts *testScraperError) Scrape(ctx context.Context) (pdata.Metrics, error) {
73+
return pdata.NewMetrics(), ts.err
74+
}

0 commit comments

Comments
 (0)