Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flowlogs: make calico-node to fetch flow logs from a node #10144

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion felix/collector/goldmane/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package goldmane
import (
"context"
"fmt"
"os"
"sort"
"strings"
"sync"
Expand All @@ -38,6 +39,11 @@ type GoldmaneReporter struct {
address string
client *client.FlowClient
once sync.Once

// Fields related to goldmane unix socket
maySendToNodeSocket bool
nodeClient *client.FlowClient
nodeClientLock sync.RWMutex
}

func NewReporter(addr, cert, key, ca string) (*GoldmaneReporter, error) {
Expand All @@ -48,6 +54,9 @@ func NewReporter(addr, cert, key, ca string) (*GoldmaneReporter, error) {
return &GoldmaneReporter{
address: addr,
client: cli,

// Do not send flowlogs to node socket, if goldmane address set via FelixConfig is equal to node socket
maySendToNodeSocket: addr != LocalGoldmaneServer,
}, nil
}

Expand All @@ -56,18 +65,84 @@ func (g *GoldmaneReporter) Start() error {
g.once.Do(func() {
// We don't wait for the initial connection to start so we don't block the caller.
g.client.Connect(context.Background())

if g.maySendToNodeSocket {
go g.nodeSocketReporter()
}
})
return err
}

func (g *GoldmaneReporter) nodeSocketReporter() {
for {
if NodeSocketExists() {
g.mayStartNodeSocketReporter()
} else {
g.mayStopNodeSocketReporter()
}
time.Sleep(time.Second * 10)
}
}

func NodeSocketExists() bool {
_, err := os.Stat(LocalGoldmaneServer)
// In case of any error, return false
return err == nil
}

func (g *GoldmaneReporter) nodeClientIsNil() bool {
g.nodeClientLock.RLock()
defer g.nodeClientLock.RUnlock()
return g.nodeClient == nil
}

func (g *GoldmaneReporter) mayStartNodeSocketReporter() {
if !g.nodeClientIsNil() {
return
}

var err error
g.nodeClientLock.Lock()
defer g.nodeClientLock.Unlock()
sockAddr := fmt.Sprintf("unix://%v", LocalGoldmaneServer)
g.nodeClient, err = client.NewFlowClient(sockAddr, "", "", "")
if err != nil {
logrus.WithError(err).Warn("Failed to create goldmane unix client")
return
}
logrus.Info("Created goldmane unix client")
g.nodeClient.Connect(context.Background())
}

func (g *GoldmaneReporter) mayStopNodeSocketReporter() {
if g.nodeClientIsNil() {
return
}

g.nodeClientLock.Lock()
defer g.nodeClientLock.Unlock()
g.nodeClient = nil
logrus.Info("Destroyed goldmane unix client")
}

func (g *GoldmaneReporter) Report(logSlice any) error {
switch logs := logSlice.(type) {
case []*flowlog.FlowLog:
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.WithField("num", len(logs)).Debug("Dispatching flow logs to goldmane")
}
for _, l := range logs {
g.client.Push(convertFlowlogToGoldmane(l))
goldmaneLog := convertFlowlogToGoldmane(l)
g.client.Push(goldmaneLog)

if g.maySendToNodeSocket {
// If goldmane local unix server exists, also send it flowlogs.
g.nodeClientLock.RLock()
if g.nodeClient != nil {
g.nodeClient.Push(goldmaneLog)
}
g.nodeClientLock.RUnlock()
}
}
default:
logrus.Panic("Unexpected kind of log dispatcher")
Expand Down
117 changes: 117 additions & 0 deletions felix/collector/goldmane/local_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) 2025 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package goldmane

import (
"net"
"sync"

"github.com/projectcalico/calico/goldmane/pkg/server"
"github.com/projectcalico/calico/goldmane/pkg/types"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)

const (
LocalGoldmaneServer = "/var/log/calico/flowlogs/goldmane.sock"
)

type flowStore struct {
lock sync.RWMutex
flows []*types.Flow
}

func newFlowStore() *flowStore {
return &flowStore{}
}

func (s *flowStore) Receive(f *types.Flow) {
s.lock.Lock()
defer s.lock.Unlock()
s.flows = append(s.flows, f)
}

func (s *flowStore) List() []*types.Flow {
s.lock.Lock()
defer s.lock.Unlock()
return s.flows
}

func (s *flowStore) Flush() {
s.lock.Lock()
defer s.lock.Unlock()
s.flows = nil
}

func (s *flowStore) ListAndFlush() []*types.Flow {
s.lock.Lock()
defer s.lock.Unlock()
flows := s.flows
s.flows = nil
return flows
}

type NodeServer struct {
store *flowStore
grpcServer *grpc.Server
once sync.Once
sockAddr string
}

func NewNodeServer(addr string) *NodeServer {
nodeServer := NodeServer{
sockAddr: addr,
grpcServer: grpc.NewServer(),
store: newFlowStore(),
}
col := server.NewFlowCollector(nodeServer.store)
col.RegisterWith(nodeServer.grpcServer)
return &nodeServer
}

func (s *NodeServer) Run() error {
var err error
s.once.Do(func() {
var l net.Listener
l, err = net.Listen("unix", s.sockAddr)
if err != nil {
return
}
logrus.Infof("Running goldmane local server at %v", s.sockAddr)
go func() {
err = s.grpcServer.Serve(l)
if err != nil {
return
}
}()
})
return nil
}

func (s *NodeServer) Stop() {
s.grpcServer.GracefulStop()
}

func (s *NodeServer) List() []*types.Flow {
return s.store.List()
}

func (s *NodeServer) Flush() {
s.store.Flush()
}

func (s *NodeServer) ListAndFlush() []*types.Flow {
return s.store.ListAndFlush()
}
7 changes: 4 additions & 3 deletions felix/fv/flow_logs_goldmane_staged_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/projectcalico/calico/felix/bpf/conntrack"
"github.com/projectcalico/calico/felix/collector/flowlog"
"github.com/projectcalico/calico/felix/collector/goldmane"
"github.com/projectcalico/calico/felix/collector/types/endpoint"
"github.com/projectcalico/calico/felix/collector/types/tuple"
"github.com/projectcalico/calico/felix/fv/connectivity"
Expand Down Expand Up @@ -100,7 +101,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ goldmane flow log with stag

opts.ExtraEnvVars["FELIX_FLOWLOGSFLUSHINTERVAL"] = "5"
opts.ExtraEnvVars["FELIX_FLOWLOGSCOLLECTORDEBUGTRACE"] = "true"
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = flowlogs.LocalGoldmaneServer
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = goldmane.LocalGoldmaneServer

// Start felix instances.
tc, client = infrastructure.StartNNodeTopology(2, opts, infra)
Expand Down Expand Up @@ -823,7 +824,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ aggregation of flow log wit

opts.ExtraEnvVars["FELIX_FLOWLOGSFLUSHINTERVAL"] = "5"
opts.ExtraEnvVars["FELIX_FLOWLOGSCOLLECTORDEBUGTRACE"] = "true"
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = flowlogs.LocalGoldmaneServer
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = goldmane.LocalGoldmaneServer

// Start felix instances.
tc, client = infrastructure.StartNNodeTopology(2, opts, infra)
Expand Down Expand Up @@ -1572,7 +1573,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ goldmane flow log with stag

opts.ExtraEnvVars["FELIX_FLOWLOGSFLUSHINTERVAL"] = "3"
opts.ExtraEnvVars["FELIX_FLOWLOGSCOLLECTORDEBUGTRACE"] = "true"
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = flowlogs.LocalGoldmaneServer
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = goldmane.LocalGoldmaneServer

// Start felix instances.
tc, client = infrastructure.StartNNodeTopology(2, opts, infra)
Expand Down
7 changes: 4 additions & 3 deletions felix/fv/flow_logs_goldmane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/projectcalico/calico/felix/bpf/conntrack"
"github.com/projectcalico/calico/felix/collector/flowlog"
"github.com/projectcalico/calico/felix/collector/goldmane"
"github.com/projectcalico/calico/felix/collector/types/endpoint"
"github.com/projectcalico/calico/felix/collector/types/tuple"
"github.com/projectcalico/calico/felix/fv/connectivity"
Expand Down Expand Up @@ -82,7 +83,7 @@ import (
// Flow logs have little to do with the backend, and these tests are relatively slow, so
// better to run with one backend only. etcdv3 is easier because we create a fresh
// datastore for every test and so don't need to worry about cleaning resources up.
var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ goldmane flow log tests", []apiconfig.DatastoreType{apiconfig.EtcdV3}, func(getInfra infrastructure.InfraFactory) {
var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ pepper goldmane flow log tests", []apiconfig.DatastoreType{apiconfig.EtcdV3}, func(getInfra infrastructure.InfraFactory) {
bpfEnabled := os.Getenv("FELIX_FV_ENABLE_BPF") == "true"

var (
Expand All @@ -105,7 +106,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ goldmane flow log tests", [

opts.ExtraEnvVars["FELIX_FLOWLOGSCOLLECTORDEBUGTRACE"] = "true"
opts.ExtraEnvVars["FELIX_FLOWLOGSFLUSHINTERVAL"] = "2"
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = flowlogs.LocalGoldmaneServer
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = goldmane.LocalGoldmaneServer
})

JustBeforeEach(func() {
Expand Down Expand Up @@ -591,7 +592,7 @@ var _ = infrastructure.DatastoreDescribe("goldmane flow log ipv6 tests", []apico
opts.ExtraEnvVars["FELIX_FLOWLOGSFLUSHINTERVAL"] = "2"
opts.ExtraEnvVars["FELIX_IPV6SUPPORT"] = "true"
opts.ExtraEnvVars["FELIX_DefaultEndpointToHostAction"] = "RETURN"
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = flowlogs.LocalGoldmaneServer
opts.ExtraEnvVars["FELIX_FLOWLOGSGOLDMANESERVER"] = goldmane.LocalGoldmaneServer

tc, client = infrastructure.StartNNodeTopology(2, opts, infra)

Expand Down
7 changes: 3 additions & 4 deletions felix/fv/infrastructure/felix.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/projectcalico/calico/felix/collector/flowlog"
"github.com/projectcalico/calico/felix/collector/goldmane"
"github.com/projectcalico/calico/felix/fv/containers"
"github.com/projectcalico/calico/felix/fv/flowlogs"
"github.com/projectcalico/calico/felix/fv/metrics"
"github.com/projectcalico/calico/felix/fv/tcpdump"
"github.com/projectcalico/calico/felix/fv/utils"
Expand Down Expand Up @@ -87,7 +86,7 @@ type Felix struct {
TopologyOptions TopologyOptions

uniqueName string
goldmaneServer *flowlogs.GoldmaneMock
goldmaneServer *goldmane.NodeServer
}

type workload interface {
Expand Down Expand Up @@ -206,10 +205,10 @@ func RunFelix(infra DatastoreInfra, id int, options TopologyOptions) *Felix {
Expect(os.MkdirAll(logDir, 0o777)).NotTo(HaveOccurred())
args = append(args, "-v", logDir+":/var/log/calico/flowlogs")

var goldmaneServer *flowlogs.GoldmaneMock
var goldmaneServer *goldmane.NodeServer
if options.FlowLogSource == FlowLogSourceGoldmane {
sockAddr := fmt.Sprintf("%v/goldmane.sock", logDir)
goldmaneServer = flowlogs.NewGoldmaneMock(sockAddr)
goldmaneServer = goldmane.NewNodeServer(sockAddr)
goldmaneServer.Run()
}

Expand Down
7 changes: 7 additions & 0 deletions node/cmd/calico-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/projectcalico/calico/node/cmd/calico-node/bpf"
"github.com/projectcalico/calico/node/pkg/allocateip"
"github.com/projectcalico/calico/node/pkg/cni"
"github.com/projectcalico/calico/node/pkg/flowlogs"
"github.com/projectcalico/calico/node/pkg/health"
"github.com/projectcalico/calico/node/pkg/hostpathinit"
"github.com/projectcalico/calico/node/pkg/lifecycle/shutdown"
Expand Down Expand Up @@ -71,6 +72,9 @@ var thresholdTime = flagSet.Duration("threshold-time", 30*time.Second, "Threshol
var runStatusReporter = flagSet.Bool("status-reporter", false, "Run node status reporter")
var showStatus = flagSet.Bool("show-status", false, "Print out node status")

// Options for node flowlogs.
var watchFlowlogs = flagSet.Bool("watch-flowlogs", false, "Watch for node flowlogs")

// confd flags
var runConfd = flagSet.Bool("confd", false, "Run confd")
var confdRunOnce = flagSet.Bool("confd-run-once", false, "Run confd in oneshot mode")
Expand Down Expand Up @@ -172,6 +176,9 @@ func main() {
} else if *showStatus {
status.Show()
os.Exit(0)
} else if *watchFlowlogs {
flowlogs.StartServerAndWatch()
os.Exit(0)
} else {
fmt.Println("No valid options provided. Usage:")
flagSet.PrintDefaults()
Expand Down
Loading