Skip to content

Commit 155dc4e

Browse files
authored
Flowlogs: make calico-node to fetch flow logs from a node (#10144)
1 parent 9f70be8 commit 155dc4e

30 files changed

+878
-174
lines changed

api/pkg/apis/projectcalico/v3/felixconfig.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,10 @@ type FelixConfigurationSpec struct {
835835
// FlowLogGoldmaneServer is the flow server endpoint to which flow data should be published.
836836
FlowLogsGoldmaneServer *string `json:"flowLogsGoldmaneServer,omitempty"`
837837

838+
// FlowLogsLocalReporter configures local unix socket for reporting flow data from each node. [Default: Disabled]
839+
// +kubebuilder:validation:Enum=Disabled;Enabled
840+
FlowLogsLocalReporter *string `json:"flowLogsLocalReporter,omitempty"`
841+
838842
// BPFProfiling controls profiling of BPF programs. At the monent, it can be
839843
// Disabled or Enabled. [Default: Disabled]
840844
//+kubebuilder:validation:Enum=Enabled;Disabled

api/pkg/apis/projectcalico/v3/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/pkg/openapi/generated.openapi.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

felix/collector/dpstatshelper.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/projectcalico/calico/felix/calc"
2121
"github.com/projectcalico/calico/felix/collector/flowlog"
2222
"github.com/projectcalico/calico/felix/collector/goldmane"
23+
"github.com/projectcalico/calico/felix/collector/local"
2324
"github.com/projectcalico/calico/felix/collector/types"
2425
"github.com/projectcalico/calico/felix/config"
2526
"github.com/projectcalico/calico/felix/rules"
@@ -29,6 +30,7 @@ import (
2930
const (
3031
// Log dispatcher names
3132
FlowLogsGoldmaneReporterName = "goldmane"
33+
FlowLogsLocalReporterName = "socket"
3234
)
3335

3436
// New creates the required dataplane stats collector, reporters and aggregators.
@@ -72,6 +74,12 @@ func New(
7274
dispatchers[FlowLogsGoldmaneReporterName] = gd
7375
}
7476
}
77+
if configParams.FlowLogsLocalReporterEnabled() {
78+
log.Infof("Creating local Flow Logs Reporter with address %v", local.SocketAddress)
79+
nd := local.NewReporter()
80+
dispatchers[FlowLogsLocalReporterName] = nd
81+
}
82+
7583
if len(dispatchers) > 0 {
7684
log.Info("Creating Flow Logs Reporter")
7785
cw := flowlog.NewReporter(dispatchers, configParams.FlowLogsFlushInterval, healthAggregator)
@@ -84,24 +92,35 @@ func New(
8492

8593
// configureFlowAggregation adds appropriate aggregators to the FlowLogReporter, depending on configuration.
8694
func configureFlowAggregation(configParams *config.Config, fr *flowlog.FlowLogReporter) {
95+
// Set up aggregator for goldmane reporter.
8796
if configParams.FlowLogsGoldmaneServer != "" {
88-
log.Info("Creating golemane Aggregator for allowed")
89-
gaa := flowlog.NewAggregator().
90-
DisplayDebugTraceLogs(configParams.FlowLogsCollectorDebugTrace).
91-
IncludeLabels(true).
92-
IncludePolicies(true).
93-
IncludeService(true).
94-
ForAction(rules.RuleActionAllow)
97+
log.Info("Creating goldmane Aggregator for allowed")
98+
gaa := defaultFlowAggregator(rules.RuleActionAllow, configParams.FlowLogsCollectorDebugTrace)
9599
log.Info("Adding Flow Logs Aggregator (allowed) for goldmane")
96100
fr.AddAggregator(gaa, []string{FlowLogsGoldmaneReporterName})
97101
log.Info("Creating goldmane Aggregator for denied")
98-
gad := flowlog.NewAggregator().
99-
DisplayDebugTraceLogs(configParams.FlowLogsCollectorDebugTrace).
100-
IncludeLabels(true).
101-
IncludePolicies(true).
102-
IncludeService(true).
103-
ForAction(rules.RuleActionDeny)
102+
gad := defaultFlowAggregator(rules.RuleActionDeny, configParams.FlowLogsCollectorDebugTrace)
104103
log.Info("Adding Flow Logs Aggregator (denied) for goldmane")
105104
fr.AddAggregator(gad, []string{FlowLogsGoldmaneReporterName})
106105
}
106+
// Set up aggregator for local socket reporter.
107+
if configParams.FlowLogsLocalReporterEnabled() {
108+
log.Info("Creating local socket Aggregator for allowed")
109+
gaa := defaultFlowAggregator(rules.RuleActionAllow, configParams.FlowLogsCollectorDebugTrace)
110+
log.Info("Adding Flow Logs Aggregator (allowed) for local socket")
111+
fr.AddAggregator(gaa, []string{FlowLogsLocalReporterName})
112+
log.Info("Creating local socket Aggregator for denied")
113+
gad := defaultFlowAggregator(rules.RuleActionDeny, configParams.FlowLogsCollectorDebugTrace)
114+
log.Info("Adding Flow Logs Aggregator (denied) for local socket")
115+
fr.AddAggregator(gad, []string{FlowLogsLocalReporterName})
116+
}
117+
}
118+
119+
func defaultFlowAggregator(forAction rules.RuleAction, traceEnabled bool) *flowlog.Aggregator {
120+
return flowlog.NewAggregator().
121+
DisplayDebugTraceLogs(traceEnabled).
122+
IncludeLabels(true).
123+
IncludePolicies(true).
124+
IncludeService(true).
125+
ForAction(forAction)
107126
}

felix/collector/goldmane/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (g *GoldmaneReporter) Report(logSlice any) error {
6767
logrus.WithField("num", len(logs)).Debug("Dispatching flow logs to goldmane")
6868
}
6969
for _, l := range logs {
70-
g.client.Push(convertFlowlogToGoldmane(l))
70+
g.client.Push(ConvertFlowlogToGoldmane(l))
7171
}
7272
default:
7373
logrus.Panic("Unexpected kind of log dispatcher")
@@ -115,7 +115,7 @@ func convertAction(a flowlog.Action) proto.Action {
115115
return proto.Action_ActionUnspecified
116116
}
117117

118-
func convertFlowlogToGoldmane(fl *flowlog.FlowLog) *types.Flow {
118+
func ConvertFlowlogToGoldmane(fl *flowlog.FlowLog) *types.Flow {
119119
return &types.Flow{
120120
Key: types.NewFlowKey(
121121
&types.FlowKeySource{

felix/collector/local/reporter.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright (c) 2025 Tigera, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package local
16+
17+
import (
18+
"context"
19+
"os"
20+
"sync"
21+
"time"
22+
23+
"github.com/sirupsen/logrus"
24+
25+
"github.com/projectcalico/calico/felix/collector/flowlog"
26+
"github.com/projectcalico/calico/felix/collector/goldmane"
27+
"github.com/projectcalico/calico/goldmane/pkg/client"
28+
)
29+
30+
const (
31+
checkLocalSocketTimer = time.Second * 10
32+
)
33+
34+
type LocalSocketReporter struct {
35+
client *client.FlowClient
36+
clientLock sync.RWMutex
37+
clientCancel context.CancelFunc
38+
once sync.Once
39+
}
40+
41+
func NewReporter() *LocalSocketReporter {
42+
return &LocalSocketReporter{}
43+
}
44+
45+
func (l *LocalSocketReporter) Start() error {
46+
var err error
47+
l.once.Do(func() {
48+
go l.run()
49+
})
50+
return err
51+
}
52+
53+
func (l *LocalSocketReporter) run() {
54+
for {
55+
if _, err := os.Stat(SocketPath); err == nil {
56+
l.mayStartClient()
57+
} else {
58+
l.mayStopClient()
59+
}
60+
time.Sleep(checkLocalSocketTimer)
61+
}
62+
}
63+
64+
func (l *LocalSocketReporter) clientIsNil() bool {
65+
l.clientLock.RLock()
66+
defer l.clientLock.RUnlock()
67+
return l.client == nil
68+
}
69+
70+
func (l *LocalSocketReporter) mayStartClient() {
71+
// If local socket is already setup, do not try to set it up again.
72+
if !l.clientIsNil() {
73+
return
74+
}
75+
76+
var err error
77+
l.clientLock.Lock()
78+
defer l.clientLock.Unlock()
79+
l.client, err = client.NewFlowClient(SocketAddress, "", "", "")
80+
if err != nil {
81+
logrus.WithError(err).Warn("Failed to create local socket client")
82+
return
83+
}
84+
logrus.Info("Created local socket client")
85+
ctx, cancel := context.WithCancel(context.Background())
86+
l.clientCancel = cancel
87+
l.client.Connect(ctx)
88+
}
89+
90+
func (l *LocalSocketReporter) mayStopClient() {
91+
// If local socket is already closed, do not try to close it again.
92+
if l.clientIsNil() {
93+
return
94+
}
95+
96+
l.clientLock.Lock()
97+
defer l.clientLock.Unlock()
98+
l.clientCancel()
99+
l.client = nil
100+
logrus.Info("Destroyed local socket client")
101+
}
102+
103+
func (n *LocalSocketReporter) Report(logSlice any) error {
104+
switch logs := logSlice.(type) {
105+
case []*flowlog.FlowLog:
106+
if logrus.IsLevelEnabled(logrus.DebugLevel) {
107+
logrus.WithField("num", len(logs)).Debug("Dispatching flow logs to local socket")
108+
}
109+
for _, l := range logs {
110+
n.clientLock.RLock()
111+
if n.client != nil {
112+
n.client.Push(goldmane.ConvertFlowlogToGoldmane(l))
113+
}
114+
n.clientLock.RUnlock()
115+
}
116+
default:
117+
logrus.Panic("Unexpected kind of log dispatcher")
118+
}
119+
return nil
120+
}

0 commit comments

Comments
 (0)