Skip to content

Flowlogs: make calico-node to fetch flow logs from a node #10144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d98f9ed
first commit
mazdakn Apr 4, 2025
914c030
first prototype
mazdakn Apr 4, 2025
f6675fa
udapte
mazdakn Apr 4, 2025
813a16d
Add more
mazdakn Apr 5, 2025
1b4462b
update
mazdakn Apr 9, 2025
0a91e70
more
mazdakn Apr 10, 2025
71437b5
Cleanup
mazdakn Apr 10, 2025
71e3bd2
Fix linter
mazdakn Apr 10, 2025
c20ee50
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 10, 2025
57cae89
remove goldmane server mock
mazdakn Apr 10, 2025
2013ced
Fix FVs
mazdakn Apr 11, 2025
7c21315
clean up
mazdakn Apr 11, 2025
8524f88
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 11, 2025
744bc25
some more change
mazdakn Apr 11, 2025
fd1c83d
more
mazdakn Apr 11, 2025
e6e3ebf
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 11, 2025
d6d9c71
Add FV
mazdakn Apr 11, 2025
f4931da
Final cleanup
mazdakn Apr 11, 2025
5b8fc24
Update Fv
mazdakn Apr 12, 2025
95af41d
update comment
mazdakn Apr 12, 2025
f931e41
Fix FVs
mazdakn Apr 12, 2025
c479545
improve FV
mazdakn Apr 12, 2025
dfbd71c
Update node/cmd/calico-node/main.go
mazdakn Apr 14, 2025
a2c0168
Update felix/collector/goldmane/node_server.go
mazdakn Apr 14, 2025
37b3f55
markup
mazdakn Apr 14, 2025
a4967ae
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 14, 2025
fe90de6
Fix node
mazdakn Apr 14, 2025
fc08717
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 15, 2025
4b71e40
use a felixconfig
mazdakn Apr 15, 2025
2ae5a5e
few changes
mazdakn Apr 15, 2025
37376a6
update
mazdakn Apr 16, 2025
f5cb375
update
mazdakn Apr 16, 2025
0cc4e5a
change config option
mazdakn Apr 16, 2025
a87bd9a
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 16, 2025
9f3e845
change names
mazdakn Apr 16, 2025
0ce48d9
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 16, 2025
d361089
rename to local socket
mazdakn Apr 17, 2025
19c43e1
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/pkg/apis/projectcalico/v3/felixconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,10 @@ type FelixConfigurationSpec struct {
// FlowLogGoldmaneServer is the flow server endpoint to which flow data should be published.
FlowLogsGoldmaneServer *string `json:"flowLogsGoldmaneServer,omitempty"`

// FlowLogsLocalReporter configures local unix socket for reporting flow data from each node. [Default: Disabled]
// +kubebuilder:validation:Enum=Disabled;Enabled
FlowLogsLocalReporter *string `json:"flowLogsLocalReporter,omitempty"`

// BPFProfiling controls profiling of BPF programs. At the monent, it can be
// Disabled or Enabled. [Default: Disabled]
//+kubebuilder:validation:Enum=Enabled;Disabled
Expand Down
5 changes: 5 additions & 0 deletions api/pkg/apis/projectcalico/v3/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/pkg/openapi/generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 32 additions & 13 deletions felix/collector/dpstatshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/projectcalico/calico/felix/calc"
"github.com/projectcalico/calico/felix/collector/flowlog"
"github.com/projectcalico/calico/felix/collector/goldmane"
"github.com/projectcalico/calico/felix/collector/local"
"github.com/projectcalico/calico/felix/collector/types"
"github.com/projectcalico/calico/felix/config"
"github.com/projectcalico/calico/felix/rules"
Expand All @@ -29,6 +30,7 @@ import (
const (
// Log dispatcher names
FlowLogsGoldmaneReporterName = "goldmane"
FlowLogsLocalReporterName = "socket"
)

// New creates the required dataplane stats collector, reporters and aggregators.
Expand Down Expand Up @@ -72,6 +74,12 @@ func New(
dispatchers[FlowLogsGoldmaneReporterName] = gd
}
}
if configParams.FlowLogsLocalReporterEnabled() {
log.Infof("Creating local Flow Logs Reporter with address %v", local.SocketAddress)
nd := local.NewReporter()
dispatchers[FlowLogsLocalReporterName] = nd
}

if len(dispatchers) > 0 {
log.Info("Creating Flow Logs Reporter")
cw := flowlog.NewReporter(dispatchers, configParams.FlowLogsFlushInterval, healthAggregator)
Expand All @@ -84,24 +92,35 @@ func New(

// configureFlowAggregation adds appropriate aggregators to the FlowLogReporter, depending on configuration.
func configureFlowAggregation(configParams *config.Config, fr *flowlog.FlowLogReporter) {
// Set up aggregator for goldmane reporter.
if configParams.FlowLogsGoldmaneServer != "" {
log.Info("Creating golemane Aggregator for allowed")
gaa := flowlog.NewAggregator().
DisplayDebugTraceLogs(configParams.FlowLogsCollectorDebugTrace).
IncludeLabels(true).
IncludePolicies(true).
IncludeService(true).
ForAction(rules.RuleActionAllow)
log.Info("Creating goldmane Aggregator for allowed")
gaa := defaultFlowAggregator(rules.RuleActionAllow, configParams.FlowLogsCollectorDebugTrace)
log.Info("Adding Flow Logs Aggregator (allowed) for goldmane")
fr.AddAggregator(gaa, []string{FlowLogsGoldmaneReporterName})
log.Info("Creating goldmane Aggregator for denied")
gad := flowlog.NewAggregator().
DisplayDebugTraceLogs(configParams.FlowLogsCollectorDebugTrace).
IncludeLabels(true).
IncludePolicies(true).
IncludeService(true).
ForAction(rules.RuleActionDeny)
gad := defaultFlowAggregator(rules.RuleActionDeny, configParams.FlowLogsCollectorDebugTrace)
log.Info("Adding Flow Logs Aggregator (denied) for goldmane")
fr.AddAggregator(gad, []string{FlowLogsGoldmaneReporterName})
}
// Set up aggregator for local socket reporter.
if configParams.FlowLogsLocalReporterEnabled() {
log.Info("Creating local socket Aggregator for allowed")
gaa := defaultFlowAggregator(rules.RuleActionAllow, configParams.FlowLogsCollectorDebugTrace)
log.Info("Adding Flow Logs Aggregator (allowed) for local socket")
fr.AddAggregator(gaa, []string{FlowLogsLocalReporterName})
log.Info("Creating local socket Aggregator for denied")
gad := defaultFlowAggregator(rules.RuleActionDeny, configParams.FlowLogsCollectorDebugTrace)
log.Info("Adding Flow Logs Aggregator (denied) for local socket")
fr.AddAggregator(gad, []string{FlowLogsLocalReporterName})
}
}

func defaultFlowAggregator(forAction rules.RuleAction, traceEnabled bool) *flowlog.Aggregator {
return flowlog.NewAggregator().
DisplayDebugTraceLogs(traceEnabled).
IncludeLabels(true).
IncludePolicies(true).
IncludeService(true).
ForAction(forAction)
}
4 changes: 2 additions & 2 deletions felix/collector/goldmane/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (g *GoldmaneReporter) Report(logSlice any) error {
logrus.WithField("num", len(logs)).Debug("Dispatching flow logs to goldmane")
}
for _, l := range logs {
g.client.Push(convertFlowlogToGoldmane(l))
g.client.Push(ConvertFlowlogToGoldmane(l))
}
default:
logrus.Panic("Unexpected kind of log dispatcher")
Expand Down Expand Up @@ -115,7 +115,7 @@ func convertAction(a flowlog.Action) proto.Action {
return proto.Action_ActionUnspecified
}

func convertFlowlogToGoldmane(fl *flowlog.FlowLog) *types.Flow {
func ConvertFlowlogToGoldmane(fl *flowlog.FlowLog) *types.Flow {
return &types.Flow{
Key: types.NewFlowKey(
&types.FlowKeySource{
Expand Down
120 changes: 120 additions & 0 deletions felix/collector/local/reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2025 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
"context"
"os"
"sync"
"time"

"github.com/sirupsen/logrus"

"github.com/projectcalico/calico/felix/collector/flowlog"
"github.com/projectcalico/calico/felix/collector/goldmane"
"github.com/projectcalico/calico/goldmane/pkg/client"
)

const (
checkLocalSocketTimer = time.Second * 10
)

type LocalSocketReporter struct {
client *client.FlowClient
clientLock sync.RWMutex
clientCancel context.CancelFunc
once sync.Once
}

func NewReporter() *LocalSocketReporter {
return &LocalSocketReporter{}
}

func (l *LocalSocketReporter) Start() error {
var err error
l.once.Do(func() {
go l.run()
})
return err
}

func (l *LocalSocketReporter) run() {
for {
if _, err := os.Stat(SocketPath); err == nil {
l.mayStartClient()
} else {
l.mayStopClient()
}
time.Sleep(checkLocalSocketTimer)
}
}

func (l *LocalSocketReporter) clientIsNil() bool {
l.clientLock.RLock()
defer l.clientLock.RUnlock()
return l.client == nil
}

func (l *LocalSocketReporter) mayStartClient() {
// If local socket is already setup, do not try to set it up again.
if !l.clientIsNil() {
return
}

var err error
l.clientLock.Lock()
defer l.clientLock.Unlock()
l.client, err = client.NewFlowClient(SocketAddress, "", "", "")
if err != nil {
logrus.WithError(err).Warn("Failed to create local socket client")
return
}
logrus.Info("Created local socket client")
ctx, cancel := context.WithCancel(context.Background())
l.clientCancel = cancel
l.client.Connect(ctx)
}

func (l *LocalSocketReporter) mayStopClient() {
// If local socket is already closed, do not try to close it again.
if l.clientIsNil() {
return
}

l.clientLock.Lock()
defer l.clientLock.Unlock()
l.clientCancel()
l.client = nil
logrus.Info("Destroyed local socket client")
}

func (n *LocalSocketReporter) Report(logSlice any) error {
switch logs := logSlice.(type) {
case []*flowlog.FlowLog:
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.WithField("num", len(logs)).Debug("Dispatching flow logs to local socket")
}
for _, l := range logs {
n.clientLock.RLock()
if n.client != nil {
n.client.Push(goldmane.ConvertFlowlogToGoldmane(l))
}
n.clientLock.RUnlock()
}
default:
logrus.Panic("Unexpected kind of log dispatcher")
}
return nil
}
Loading