Skip to content

Commit 9adbc5b

Browse files
Goldmane emitter improvements (projectcalico#9982)
* Adjust config model to be in terms of time, not buckets * Reload certificates on change in emitter client * Fix static checks
1 parent cbcacc8 commit 9adbc5b

File tree

5 files changed

+124
-21
lines changed

5 files changed

+124
-21
lines changed

goldmane/pkg/aggregator/aggregator_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -760,8 +760,8 @@ func TestSink(t *testing.T) {
760760
// - The first window we aggregated covered 952-972 (but had now flows)
761761
// - The second window we aggregated covered 972-992 (but had no flows)
762762
// - The third window we aggregated covered 992-1012 (and had flows!)
763-
require.Equal(t, int64(1012), sink.buckets[0].EndTime)
764-
require.Equal(t, int64(992), sink.buckets[0].StartTime)
763+
require.Equal(t, int64(1011), sink.buckets[0].EndTime)
764+
require.Equal(t, int64(991), sink.buckets[0].StartTime)
765765
require.Equal(t, int64(20), sink.buckets[0].EndTime-sink.buckets[0].StartTime)
766766

767767
// Expect the bucket to have aggregated to a single flow.

goldmane/pkg/aggregator/bucketing/bucket_ring.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func NewBucketRing(n, interval int, now int64, opts ...BucketRingOption) *Bucket
130130
oldestBucketStart := time.Unix(newestBucketStart-int64(interval*n), 0)
131131
oldestBucketEnd := time.Unix(oldestBucketStart.Unix()+int64(interval), 0)
132132
ring.buckets[0] = *NewAggregationBucket(oldestBucketStart, oldestBucketEnd)
133-
for i := 0; i < n; i++ {
133+
for range n {
134134
ring.Rollover()
135135
}
136136

@@ -268,18 +268,26 @@ func (r *BucketRing) findBucket(time int64) (int, *AggregationBucket) {
268268
return -1, nil
269269
}
270270

271+
// nowIndex returns the index of the bucket that represents the current time.
272+
// This is different from the head index, which is actually one bucket into the future.
273+
func (r *BucketRing) nowIndex() int {
274+
return r.indexSubtract(r.headIndex, 1)
275+
}
276+
271277
// FlowCollection returns a collection of flows to emit, or nil if we are still waiting for more data.
272278
// The BucketRing builds a FlowCollection by aggregating flow data from across a window of buckets. The window
273279
// is a fixed size (i.e., a fixed number of buckets), and starts a fixed period of time in the past in order to allow
274280
// for statistics to settle down before publishing.
275281
func (r *BucketRing) FlowCollection() *FlowCollection {
276-
// Determine the newest bucket in the aggregation - this is always N buckets back from the head.
277-
endIndex := r.indexSubtract(r.headIndex, r.pushAfter)
282+
// Determine the newest bucket in the aggregation - this is always pushAfter buckets back from "now".
283+
endIndex := r.indexSubtract(r.nowIndex(), r.pushAfter)
278284
startIndex := r.indexSubtract(endIndex, r.bucketsToAggregate)
279285

280286
logrus.WithFields(logrus.Fields{
281287
"startIndex": startIndex,
282288
"endIndex": endIndex,
289+
"startTime": r.buckets[startIndex].StartTime,
290+
"endTime": r.buckets[endIndex].StartTime,
283291
}).Debug("Checking if bucket range should be emitted")
284292

285293
// Check if we're ready to emit. Wait until the oldest bucket in the window has not yet

goldmane/pkg/daemon/daemon.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,17 @@ type Config struct {
6363
// from each node in the cluster.
6464
AggregationWindow time.Duration `json:"aggregation_window" envconfig:"AGGREGATION_WINDOW" default:"15s"`
6565

66-
// The number of buckets to combine when pushing flows to the sink. This can be used to reduce the number
67-
// buckets combined into time-aggregated flows that are sent to the sink.
68-
NumBucketsToCombine int `json:"num_buckets_to_combine" envconfig:"NUM_BUCKETS_TO_COMBINE" default:"20"`
69-
70-
// PushIndex is the index of the bucket which triggers pushing to the emitter. A larger value
71-
// will increase the latency of emitted flows, while a smaller value will cause the emitter to emit
72-
// potentially incomplete flows.
73-
PushIndex int `json:"push_index" envconfig:"PUSH_INDEX" default:"30"`
66+
// EmitAfterSeconds is the number of seconds from flow receipt to wait before a flow becomes eligible to be
67+
// pushed to the emitter. Increasing this value will increase the latency of emitted flows, while a smaller
68+
// value will cause the emitter to emit potentially incomplete flows.
69+
//
70+
// This must be a multiple of the aggregation window.
71+
EmitAfterSeconds int `json:"emit_after_seconds" envconfig:"EMIT_AFTER_SECONDS" default:"30"`
72+
73+
// EmitterAggregationWindow is the duration of time across which flows are aggregated before being pushed to the emitter.
74+
// This must be a multiple of the aggregation window.
75+
// Emitted flows will be aggregated from "now-EmitAfterSeconds-EmitterAggregationWindow" to "now-EmitAfterSeconds"
76+
EmitterAggregationWindow time.Duration `json:"emitter_aggregation_window" envconfig:"EMITTER_AGGREGATION_WINDOW" default:"5m"`
7477
}
7578

7679
func ConfigFromEnv() Config {
@@ -116,8 +119,8 @@ func Run(ctx context.Context, cfg Config) {
116119
// Track options for log aggregator.
117120
aggOpts := []aggregator.Option{
118121
aggregator.WithRolloverTime(cfg.AggregationWindow),
119-
aggregator.WithBucketsToCombine(cfg.NumBucketsToCombine),
120-
aggregator.WithPushIndex(cfg.PushIndex),
122+
aggregator.WithBucketsToCombine(int(cfg.EmitterAggregationWindow.Seconds()) / int(cfg.AggregationWindow.Seconds())),
123+
aggregator.WithPushIndex(cfg.EmitAfterSeconds / int(cfg.AggregationWindow.Seconds())),
121124
}
122125

123126
if cfg.PushURL != "" {

goldmane/pkg/emitter/http_client.go

+83-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ import (
2424
"os"
2525
"time"
2626

27+
"github.com/fsnotify/fsnotify"
2728
"github.com/sirupsen/logrus"
29+
30+
"github.com/projectcalico/calico/lib/std/chanutil"
2831
)
2932

3033
const ContentTypeMultilineJSON = "application/x-ndjson"
@@ -71,20 +74,96 @@ func newHTTPClient(caCert, clientKey, clientCert, serverName string) (*http.Clie
7174
}
7275

7376
func newEmitterClient(url, caCert, clientKey, clientCert, serverName string) (*emitterClient, error) {
77+
// Create an initial HTTP client, and a function to help encapsualte the reload logic.
7478
client, err := newHTTPClient(caCert, clientKey, clientCert, serverName)
7579
if err != nil {
7680
return nil, err
7781
}
78-
return &emitterClient{url: url, client: client}, nil
82+
83+
updChan := make(chan struct{})
84+
getClient := func() (*http.Client, error) {
85+
select {
86+
case _, ok := <-updChan:
87+
if ok {
88+
// Only reload the client if the channel is still open. If the filewatcher
89+
// has been closed, we'll just continue using the existing client as best-effort.
90+
logrus.Info("Reloading client after certificate change")
91+
client, err = newHTTPClient(caCert, clientKey, clientCert, serverName)
92+
if err != nil {
93+
return nil, fmt.Errorf("error reloading CA cert: %s", err)
94+
}
95+
}
96+
default:
97+
// No change, return the existing client.
98+
}
99+
return client, nil
100+
}
101+
102+
if caCert != "" || clientKey != "" || clientCert != "" {
103+
// Start a goroutine to watch for changes to the CA cert file and feed
104+
// them into the update channel.
105+
monitorFn, err := watchFiles(updChan, caCert, clientCert, clientKey)
106+
if err != nil {
107+
return nil, fmt.Errorf("error setting up CA cert file watcher: %s", err)
108+
}
109+
go monitorFn()
110+
}
111+
112+
return &emitterClient{
113+
url: url,
114+
getClient: getClient,
115+
}, nil
116+
}
117+
118+
func watchFiles(updChan chan struct{}, files ...string) (func(), error) {
119+
fileWatcher, err := fsnotify.NewWatcher()
120+
if err != nil {
121+
return nil, fmt.Errorf("error creating file watcher: %s", err)
122+
}
123+
for _, file := range files {
124+
if err := fileWatcher.Add(file); err != nil {
125+
logrus.WithError(err).Warn("Error watching file for changes")
126+
continue
127+
}
128+
logrus.WithField("file", file).Debug("Watching file for changes")
129+
}
130+
131+
return func() {
132+
// If we exit this function, make sure to close the file watcher and update channel.
133+
defer fileWatcher.Close()
134+
defer close(updChan)
135+
defer logrus.Info("File watcher closed")
136+
for {
137+
select {
138+
case event, ok := <-fileWatcher.Events:
139+
if !ok {
140+
return
141+
}
142+
if event.Op&fsnotify.Write == fsnotify.Write {
143+
logrus.WithField("file", event.Name).Info("File changed, triggering update")
144+
_ = chanutil.WriteNonBlocking(updChan, struct{}{})
145+
}
146+
case err, ok := <-fileWatcher.Errors:
147+
if !ok {
148+
return
149+
}
150+
logrus.Errorf("error watching CA cert file: %s", err)
151+
}
152+
}
153+
}, nil
79154
}
80155

81156
type emitterClient struct {
82-
url string
83-
client *http.Client
157+
url string
158+
getClient func() (*http.Client, error)
84159
}
85160

86161
func (e *emitterClient) Post(body io.Reader) error {
87-
resp, err := e.client.Post(e.url, ContentTypeMultilineJSON, body)
162+
client, err := e.getClient()
163+
if err != nil {
164+
return err
165+
}
166+
resp, err := client.Post(e.url, ContentTypeMultilineJSON, body)
88167
if err != nil {
89168
return err
90169
}

lib/std/chanutil/chan.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"time"
77
)
88

9-
var ErrChannelClosed = errors.New("channel closed")
10-
var ErrDeadlineExceeded = errors.New("deadline exceeded")
9+
var (
10+
ErrChannelClosed = errors.New("channel closed")
11+
ErrDeadlineExceeded = errors.New("deadline exceeded")
12+
)
1113

1214
// Read reads from the given channel and blocks until either an object is pulled off the channel, the context
1315
// is done, or the channel is closed.
@@ -45,3 +47,14 @@ func ReadWithDeadline[E any](ctx context.Context, ch <-chan E, duration time.Dur
4547
return def, ErrDeadlineExceeded
4648
}
4749
}
50+
51+
// WriteNonBlocking writes to the given channel in a non-blocking manner. It return true if the write was successful,
52+
// and false otherwise.
53+
func WriteNonBlocking[E any](ch chan<- E, v E) bool {
54+
select {
55+
case ch <- v:
56+
return true
57+
default:
58+
return false
59+
}
60+
}

0 commit comments

Comments
 (0)