Skip to content

Flowlogs: make calico-node to fetch flow logs from a node #10144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d98f9ed
first commit
mazdakn Apr 4, 2025
914c030
first prototype
mazdakn Apr 4, 2025
f6675fa
udapte
mazdakn Apr 4, 2025
813a16d
Add more
mazdakn Apr 5, 2025
1b4462b
update
mazdakn Apr 9, 2025
0a91e70
more
mazdakn Apr 10, 2025
71437b5
Cleanup
mazdakn Apr 10, 2025
71e3bd2
Fix linter
mazdakn Apr 10, 2025
c20ee50
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 10, 2025
57cae89
remove goldmane server mock
mazdakn Apr 10, 2025
2013ced
Fix FVs
mazdakn Apr 11, 2025
7c21315
clean up
mazdakn Apr 11, 2025
8524f88
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 11, 2025
744bc25
some more change
mazdakn Apr 11, 2025
fd1c83d
more
mazdakn Apr 11, 2025
e6e3ebf
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 11, 2025
d6d9c71
Add FV
mazdakn Apr 11, 2025
f4931da
Final cleanup
mazdakn Apr 11, 2025
5b8fc24
Update Fv
mazdakn Apr 12, 2025
95af41d
update comment
mazdakn Apr 12, 2025
f931e41
Fix FVs
mazdakn Apr 12, 2025
c479545
improve FV
mazdakn Apr 12, 2025
dfbd71c
Update node/cmd/calico-node/main.go
mazdakn Apr 14, 2025
a2c0168
Update felix/collector/goldmane/node_server.go
mazdakn Apr 14, 2025
37b3f55
markup
mazdakn Apr 14, 2025
a4967ae
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 14, 2025
fe90de6
Fix node
mazdakn Apr 14, 2025
fc08717
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 15, 2025
4b71e40
use a felixconfig
mazdakn Apr 15, 2025
2ae5a5e
few changes
mazdakn Apr 15, 2025
37376a6
update
mazdakn Apr 16, 2025
f5cb375
update
mazdakn Apr 16, 2025
0cc4e5a
change config option
mazdakn Apr 16, 2025
a87bd9a
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 16, 2025
9f3e845
change names
mazdakn Apr 16, 2025
0ce48d9
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 16, 2025
d361089
rename to local socket
mazdakn Apr 17, 2025
19c43e1
Merge remote-tracking branch 'open-source/master' into tool-flowlog
mazdakn Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion felix/collector/goldmane/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type GoldmaneReporter struct {
address string
client *client.FlowClient
once sync.Once

// Fields related to goldmane node socket
mayReportToNodeSocket bool
nodeClient *client.FlowClient
nodeClientLock sync.RWMutex
}

func NewReporter(addr, cert, key, ca string) (*GoldmaneReporter, error) {
Expand All @@ -48,6 +53,9 @@ func NewReporter(addr, cert, key, ca string) (*GoldmaneReporter, error) {
return &GoldmaneReporter{
address: addr,
client: cli,

// Do not send flowlogs to node socket, if goldmane address set via FelixConfig is equal to node socket
mayReportToNodeSocket: addr != NodeSocketAddress,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much work would it be to lift this out of the Goldmane reporter and into its own Reporter, and then adding the ability to run multiple reporters + dynamically add / remove them?

It seems to me that we are going to want the ability to have multiple Reporters running at once - it would be cool to be able to use this e.g., even if Goldmane wasn't configured.

Copy link
Member Author

@mazdakn mazdakn Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not much! Collector already supports multiple reporters.
But if we want to be able to run this without goldmane enabled, we should introduce a config option to enabled/disable it. Something like FlowLogsGoldmaneNodeServer with Enabled and Disabled values. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a preferrable route to me, if it's not too much work. A new LocalFlowSocket: Enabled or something would make sense as the config option name probably?

}, nil
}

Expand All @@ -56,18 +64,79 @@ func (g *GoldmaneReporter) Start() error {
g.once.Do(func() {
// We don't wait for the initial connection to start so we don't block the caller.
g.client.Connect(context.Background())

if g.mayReportToNodeSocket {
go g.nodeSocketReporter()
}
})
return err
}

func (g *GoldmaneReporter) nodeSocketReporter() {
for {
if NodeSocketExists() {
g.mayStartNodeSocketReporter()
} else {
g.mayStopNodeSocketReporter()
}
time.Sleep(time.Second * 10)
}
}

func (g *GoldmaneReporter) nodeClientIsNil() bool {
g.nodeClientLock.RLock()
defer g.nodeClientLock.RUnlock()
return g.nodeClient == nil
}

func (g *GoldmaneReporter) mayStartNodeSocketReporter() {
// If node socket is already setup, do not try to set it up again.
if !g.nodeClientIsNil() {
return
}

var err error
g.nodeClientLock.Lock()
defer g.nodeClientLock.Unlock()
g.nodeClient, err = client.NewFlowClient(NodeSocketAddress, "", "", "")
if err != nil {
logrus.WithError(err).Warn("Failed to create goldmane unix client")
return
}
logrus.Info("Created goldmane node client")
g.nodeClient.Connect(context.Background())
}

func (g *GoldmaneReporter) mayStopNodeSocketReporter() {
// If node socket is already closed, do not try to close it again.
if g.nodeClientIsNil() {
return
}

g.nodeClientLock.Lock()
defer g.nodeClientLock.Unlock()
g.nodeClient = nil
logrus.Info("Destroyed goldmane node client")
}

func (g *GoldmaneReporter) Report(logSlice any) error {
switch logs := logSlice.(type) {
case []*flowlog.FlowLog:
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.WithField("num", len(logs)).Debug("Dispatching flow logs to goldmane")
}
for _, l := range logs {
g.client.Push(convertFlowlogToGoldmane(l))
goldmaneLog := convertFlowlogToGoldmane(l)
g.client.Push(goldmaneLog)

if g.mayReportToNodeSocket {
// If goldmane node server exists, also send flowlogs to it.
g.nodeClientLock.RLock()
if g.nodeClient != nil {
g.nodeClient.Push(goldmaneLog)
}
g.nodeClientLock.RUnlock()
}
}
default:
logrus.Panic("Unexpected kind of log dispatcher")
Expand Down
194 changes: 194 additions & 0 deletions felix/collector/goldmane/node_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright (c) 2025 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package goldmane

import (
"context"
"fmt"
"net"
"os"
"path"
"sync"
"time"

"github.com/sirupsen/logrus"
"google.golang.org/grpc"

"github.com/projectcalico/calico/goldmane/pkg/server"
"github.com/projectcalico/calico/goldmane/pkg/types"
)

const (
NodeSocketDir = "/var/log/calico/flowlogs"
NodeSocketName = "goldmane.sock"
)

var (
NodeSocketPath = path.Join(NodeSocketDir, NodeSocketName)
NodeSocketAddress = fmt.Sprintf("unix://%v", NodeSocketPath)
)

type flowStore struct {
lock sync.RWMutex
flows []*types.Flow
}

func newFlowStore() *flowStore {
return &flowStore{}
}

func (s *flowStore) Receive(f *types.Flow) {
s.lock.Lock()
defer s.lock.Unlock()
s.flows = append(s.flows, f)
}

func (s *flowStore) List() []*types.Flow {
s.lock.RLock()
defer s.lock.RUnlock()
return s.flows
}

func (s *flowStore) Flush() {
s.lock.Lock()
defer s.lock.Unlock()
s.flows = nil
}

func (s *flowStore) ListAndFlush() []*types.Flow {
s.lock.Lock()
defer s.lock.Unlock()
flows := s.flows
s.flows = nil
return flows
}

type NodeServer struct {
store *flowStore
grpcServer *grpc.Server
once sync.Once

// In Calico node, the address of unix socket is always /var/log/calico/flowlogs/goldmane.sock.
// However, NodeServer is also used by Felix FVs, where the code is executed outside of Calico Node. In this case,
// unix socket is created in host filesystem, and instead mounted at the mentioned path in Felix containers.
dir string
}

func NewNodeServer(dir string) *NodeServer {
nodeServer := NodeServer{
dir: dir,
grpcServer: grpc.NewServer(),
store: newFlowStore(),
}
col := server.NewFlowCollector(nodeServer.store)
col.RegisterWith(nodeServer.grpcServer)
return &nodeServer
}

func (s *NodeServer) Run() error {
var err error
err = ensureNodeSocketDirExists(s.dir)
if err != nil {
logrus.WithError(err).Error("Failed to create goldmane node socket")
return err
}

s.once.Do(func() {
var l net.Listener
sockAddr := s.Address()
l, err = net.Listen("unix", sockAddr)
if err != nil {
return
}
logrus.WithField("address", sockAddr).Info("Running goldmane node server")
go func() {
err = s.grpcServer.Serve(l)
if err != nil {
return
}
}()
})
return nil
}

func (s *NodeServer) Watch(ctx context.Context, num int, processFlow func(*types.Flow)) {
infinitLoop := num < 0
var count int
logrus.Debug("Starting to watch goldmane node socket")
for {
if ctx.Err() != nil ||
(!infinitLoop && count >= num) {
logrus.Debug("Stopped watching goldmane node socket")
return
}

flows := s.ListAndFlush()
for _, f := range flows {
processFlow(f)
}
count = count + len(flows)
time.Sleep(time.Second)
}
}

func (s *NodeServer) Stop() {
cleanupNodeSocket(s.Address())
s.grpcServer.Stop()
}

func (s *NodeServer) List() []*types.Flow {
return s.store.List()
}

func (s *NodeServer) Flush() {
s.store.Flush()
}

func (s *NodeServer) ListAndFlush() []*types.Flow {
return s.store.ListAndFlush()
}

func (s *NodeServer) Address() string {
return path.Join(s.dir, NodeSocketName)
}

func ensureNodeSocketDirExists(dir string) error {
logrus.Debug("Checking if goldmane node socket exists.")
if _, err := os.Stat(dir); os.IsNotExist(err) {
logrus.WithField("directory", dir).Debug("Goldmane node socket directory does not exist")
err := os.MkdirAll(dir, 0o600)
if err != nil {
logrus.WithError(err).WithField("directory", dir).Error("Failed to create node socket directory")
return err
}
logrus.WithField("directory", dir).Debug("Created goldmane node socket directory")
}
return nil
}

func cleanupNodeSocket(addr string) {
if NodeSocketExists() {
err := os.Remove(addr)
if err != nil {
logrus.WithError(err).WithField("address", addr).Errorf("Failed to remove goldmane node socket")
}
}
}

func NodeSocketExists() bool {
_, err := os.Stat(NodeSocketPath)
// In case of any error, return false
return err == nil
}
7 changes: 4 additions & 3 deletions felix/fv/flow_logs_goldmane_staged_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/projectcalico/calico/felix/bpf/conntrack"
"github.com/projectcalico/calico/felix/collector/flowlog"
"github.com/projectcalico/calico/felix/collector/goldmane"
"github.com/projectcalico/calico/felix/collector/types/endpoint"
"github.com/projectcalico/calico/felix/collector/types/tuple"
"github.com/projectcalico/calico/felix/fv/connectivity"
Expand Down Expand Up @@ -100,7 +101,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ goldmane flow log with stag

opts.ExtraEnvVars["FELIX_FLOWLOGSFLUSHINTERVAL"] = "5"
opts.ExtraEnvVars["FELIX_FLOWLOGSCOLLECTORDEBUGTRACE"] = "true"
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = flowlogs.LocalGoldmaneServer
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = goldmane.NodeSocketAddress

// Start felix instances.
tc, client = infrastructure.StartNNodeTopology(2, opts, infra)
Expand Down Expand Up @@ -823,7 +824,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ aggregation of flow log wit

opts.ExtraEnvVars["FELIX_FLOWLOGSFLUSHINTERVAL"] = "5"
opts.ExtraEnvVars["FELIX_FLOWLOGSCOLLECTORDEBUGTRACE"] = "true"
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = flowlogs.LocalGoldmaneServer
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = goldmane.NodeSocketAddress

// Start felix instances.
tc, client = infrastructure.StartNNodeTopology(2, opts, infra)
Expand Down Expand Up @@ -1572,7 +1573,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ goldmane flow log with stag

opts.ExtraEnvVars["FELIX_FLOWLOGSFLUSHINTERVAL"] = "3"
opts.ExtraEnvVars["FELIX_FLOWLOGSCOLLECTORDEBUGTRACE"] = "true"
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = flowlogs.LocalGoldmaneServer
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = goldmane.NodeSocketAddress

// Start felix instances.
tc, client = infrastructure.StartNNodeTopology(2, opts, infra)
Expand Down
5 changes: 3 additions & 2 deletions felix/fv/flow_logs_goldmane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/projectcalico/calico/felix/bpf/conntrack"
"github.com/projectcalico/calico/felix/collector/flowlog"
"github.com/projectcalico/calico/felix/collector/goldmane"
"github.com/projectcalico/calico/felix/collector/types/endpoint"
"github.com/projectcalico/calico/felix/collector/types/tuple"
"github.com/projectcalico/calico/felix/fv/connectivity"
Expand Down Expand Up @@ -105,7 +106,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ goldmane flow log tests", [

opts.ExtraEnvVars["FELIX_FLOWLOGSCOLLECTORDEBUGTRACE"] = "true"
opts.ExtraEnvVars["FELIX_FLOWLOGSFLUSHINTERVAL"] = "2"
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = flowlogs.LocalGoldmaneServer
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = goldmane.NodeSocketAddress
})

JustBeforeEach(func() {
Expand Down Expand Up @@ -591,7 +592,7 @@ var _ = infrastructure.DatastoreDescribe("goldmane flow log ipv6 tests", []apico
opts.ExtraEnvVars["FELIX_FLOWLOGSFLUSHINTERVAL"] = "2"
opts.ExtraEnvVars["FELIX_IPV6SUPPORT"] = "true"
opts.ExtraEnvVars["FELIX_DefaultEndpointToHostAction"] = "RETURN"
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = flowlogs.LocalGoldmaneServer
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = goldmane.NodeSocketAddress

tc, client = infrastructure.StartNNodeTopology(2, opts, infra)

Expand Down
Loading