Skip to content

Commit 756c80b

Browse files
Add dynamic enable/disable of flow emission (#9995)
1 parent 121f96a commit 756c80b

File tree

14 files changed

+698
-217
lines changed

14 files changed

+698
-217
lines changed

goldmane/pkg/aggregator/aggregator.go

+14-21
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ type LogAggregator struct {
8787
streamRequests chan streamRequest
8888

8989
// sink is a sink to send aggregated flows to.
90-
sink Sink
90+
sink bucketing.Sink
91+
92+
// sinkChan allows setting the sink asynchronously.
93+
sinkChan chan bucketing.Sink
9194

9295
// recvChan is the channel to receive flow updates on.
9396
recvChan chan *proto.FlowUpdate
@@ -124,6 +127,7 @@ func NewLogAggregator(opts ...Option) *LogAggregator {
124127
listRequests: make(chan listRequest),
125128
streamRequests: make(chan streamRequest),
126129
recvChan: make(chan *proto.FlowUpdate, channelDepth),
130+
sinkChan: make(chan bucketing.Sink, 10),
127131
rolloverFunc: time.After,
128132
bucketsToAggregate: 20,
129133
pushIndex: 30,
@@ -195,7 +199,7 @@ func (a *LogAggregator) Run(startTime int64) {
195199
a.handleFlowUpdate(upd)
196200
case <-rolloverCh:
197201
rolloverCh = a.rolloverFunc(a.rollover())
198-
a.maybeEmitFlows()
202+
a.buckets.EmitFlowCollections(a.sink)
199203
case req := <-a.listRequests:
200204
req.respCh <- a.queryFlows(req.req)
201205
case req := <-a.streamRequests:
@@ -204,13 +208,21 @@ func (a *LogAggregator) Run(startTime int64) {
204208
a.backfill(stream, req.req)
205209
case id := <-a.streams.closedStreams():
206210
a.streams.close(id)
211+
case sink := <-a.sinkChan:
212+
logrus.WithField("sink", sink).Info("Setting aggregator sink")
213+
a.sink = sink
214+
a.buckets.EmitFlowCollections(a.sink)
207215
case <-a.done:
208216
logrus.Warn("Aggregator shutting down")
209217
return
210218
}
211219
}
212220
}
213221

222+
func (a *LogAggregator) SetSink(s bucketing.Sink) {
223+
a.sinkChan <- s
224+
}
225+
214226
// Receive is used to send a flow update to the aggregator.
215227
func (a *LogAggregator) Receive(f *proto.FlowUpdate) {
216228
timeout := time.After(5 * time.Second)
@@ -222,25 +234,6 @@ func (a *LogAggregator) Receive(f *proto.FlowUpdate) {
222234
}
223235
}
224236

225-
func (a *LogAggregator) maybeEmitFlows() {
226-
if a.sink == nil {
227-
logrus.Debug("No sink configured, skip flow emission")
228-
return
229-
}
230-
231-
flows := a.buckets.FlowCollection()
232-
if flows == nil {
233-
// We've already pushed this bucket, so we can skip it. We'll emit the next flow once
234-
// bucketsToAggregate buckets have been rolled over.
235-
logrus.Debug("Delaying flow emission, no new flows to emit")
236-
return
237-
}
238-
239-
if len(flows.Flows) > 0 {
240-
a.sink.Receive(flows)
241-
}
242-
}
243-
244237
// Stream returns a new Stream from the aggregator. It uses a channel to synchronously request the stream
245238
// from the aggregator.
246239
func (a *LogAggregator) Stream(req *proto.FlowStreamRequest) (*Stream, error) {

0 commit comments

Comments
 (0)