Skip to content

Commit 965e053

Browse files
committed
Add collector package to collect data from assets
1 parent 09f0d27 commit 965e053

File tree

6 files changed

+793
-0
lines changed

6 files changed

+793
-0
lines changed

internal/collector/asset_iterator.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package collector
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"github.com/metal-toolbox/alloy/internal/metrics"
9+
"github.com/metal-toolbox/alloy/internal/model"
10+
"github.com/metal-toolbox/alloy/internal/store"
11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/sirupsen/logrus"
13+
"go.opentelemetry.io/otel"
14+
)
15+
16+
// TODO: this should run as a separate process -
17+
// the iterator can be run as the alloy-scheduler, which periodically
18+
// sets conditions for data collection on servers, and the collector just full fills the conditions.
19+
20+
var (
21+
ErrFetcherQuery = errors.New("error querying asset data from store")
22+
stageLabelFetcher = prometheus.Labels{"stage": "fetcher"}
23+
)
24+
25+
// AssetIterator holds methods to recurse over assets in a store and return them over the asset channel.
26+
type AssetIterator struct {
27+
store store.Repository
28+
assetCh chan *model.Asset
29+
logger *logrus.Logger
30+
}
31+
32+
// NewAssetIterator is a constructor method that returns an AssetIterator.
33+
//
34+
// The returned AssetIterator will recurse over all assets in the store and send them over the asset channel,
35+
// The caller of this method should invoke AssetChannel() to retrieve the channel to read assets from.
36+
func NewAssetIterator(repository store.Repository, logger *logrus.Logger) *AssetIterator {
37+
return &AssetIterator{store: repository, logger: logger, assetCh: make(chan *model.Asset, 1)}
38+
}
39+
40+
// Channel returns the channel to read assets from when the fetcher is invoked through its Iter* method.
41+
func (s *AssetIterator) Channel() <-chan *model.Asset {
42+
return s.assetCh
43+
}
44+
45+
// IterInBatches queries the store for assets in batches, returning them over the assetCh
46+
//
47+
// nolint:gocyclo // for now it makes sense to have the iter method logic in one method
48+
func (s *AssetIterator) IterInBatches(ctx context.Context, batchSize int, pauser *Pauser) {
49+
defer close(s.assetCh)
50+
51+
tracer := otel.Tracer("collector.AssetIterator")
52+
ctx, span := tracer.Start(ctx, "IterInBatches()")
53+
54+
defer span.End()
55+
56+
assets, total, err := s.store.AssetsByOffsetLimit(ctx, 1, batchSize)
57+
if err != nil {
58+
// count serverService query errors
59+
if errors.Is(err, ErrFetcherQuery) {
60+
metrics.ServerServiceQueryErrorCount.With(stageLabelFetcher).Inc()
61+
}
62+
63+
s.logger.WithError(err).Error(ErrFetcherQuery)
64+
65+
return
66+
}
67+
68+
// count assets retrieved
69+
metrics.ServerServiceAssetsRetrieved.With(stageLabelFetcher).Add(float64(len(assets)))
70+
71+
// submit the assets collected in the first request
72+
for _, asset := range assets {
73+
s.assetCh <- asset
74+
75+
// count assets sent to the collector
76+
metrics.AssetsSent.With(stageLabelFetcher).Inc()
77+
}
78+
79+
// all assets fetched in first query
80+
if len(assets) == total || total <= batchSize {
81+
return
82+
}
83+
84+
iterations := total / batchSize
85+
limit := batchSize
86+
87+
s.logger.WithFields(logrus.Fields{
88+
"total": total,
89+
"iterations": iterations,
90+
"limit": limit,
91+
}).Trace()
92+
93+
// continue from offset 2
94+
for offset := 2; offset < iterations+1; offset++ {
95+
// idle when pause flag is set and context isn't canceled.
96+
for pauser.Value() && ctx.Err() == nil {
97+
time.Sleep(1 * time.Second)
98+
}
99+
100+
// context canceled
101+
if ctx.Err() != nil {
102+
s.logger.WithError(err).Error("aborting collection")
103+
104+
break
105+
}
106+
107+
assets, _, err := s.store.AssetsByOffsetLimit(ctx, offset, limit)
108+
if err != nil {
109+
if errors.Is(err, ErrFetcherQuery) {
110+
metrics.ServerServiceQueryErrorCount.With(stageLabelFetcher).Inc()
111+
}
112+
113+
s.logger.WithError(err).Warn(ErrFetcherQuery)
114+
}
115+
116+
s.logger.WithFields(logrus.Fields{
117+
"offset": offset,
118+
"limit": limit,
119+
"total": total,
120+
"got": len(assets),
121+
}).Trace()
122+
123+
if len(assets) == 0 {
124+
break
125+
}
126+
127+
// count assets retrieved
128+
metrics.ServerServiceAssetsRetrieved.With(stageLabelFetcher).Add(float64(len(assets)))
129+
130+
for _, asset := range assets {
131+
s.assetCh <- asset
132+
133+
// count assets sent to collector
134+
metrics.AssetsSent.With(stageLabelFetcher).Inc()
135+
}
136+
}
137+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package collector
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
8+
"github.com/metal-toolbox/alloy/internal/store"
9+
"github.com/metal-toolbox/alloy/internal/store/mock"
10+
"github.com/sirupsen/logrus"
11+
"github.com/stretchr/testify/assert"
12+
"go.uber.org/goleak"
13+
)
14+
15+
func newTestAssetIterator(repository store.Repository) *AssetIterator {
16+
logger := logrus.New()
17+
// nolint: gocritic // comment kept for reference
18+
// logger.Level = logrus.TraceLevel
19+
20+
return NewAssetIterator(repository, logger)
21+
}
22+
23+
func Test_IterInBatches(t *testing.T) {
24+
ignorefunc := "go.opencensus.io/stats/view.(*worker).start"
25+
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction(ignorefunc))
26+
27+
testcases := []struct {
28+
name string
29+
limit int
30+
total int
31+
expected int
32+
}{
33+
{
34+
"total is zero",
35+
10,
36+
0,
37+
0,
38+
},
39+
{
40+
"total is one",
41+
10,
42+
1,
43+
1,
44+
},
45+
{
46+
"limit half of total",
47+
10,
48+
20,
49+
20,
50+
},
51+
{
52+
"limit equals total",
53+
20,
54+
20,
55+
20,
56+
},
57+
{
58+
"limit higher than total",
59+
20,
60+
3,
61+
3,
62+
},
63+
{
64+
"high total returns expected",
65+
5,
66+
100,
67+
100,
68+
},
69+
}
70+
71+
for _, tt := range testcases {
72+
t.Run(tt.name, func(t *testing.T) {
73+
mockstore, _ := mock.New(tt.total)
74+
assetIterator := newTestAssetIterator(mockstore)
75+
pauser := NewPauser()
76+
var got int
77+
78+
var syncWG sync.WaitGroup
79+
80+
syncWG.Add(1)
81+
go func() {
82+
defer syncWG.Done()
83+
for range assetIterator.Channel() {
84+
got++
85+
}
86+
87+
assert.Equal(t, tt.expected, got)
88+
}()
89+
90+
assetIterator.IterInBatches(context.TODO(), tt.limit, pauser)
91+
syncWG.Wait()
92+
})
93+
}
94+
}

0 commit comments

Comments
 (0)