Skip to content

Commit 0a91e70

Browse files
committed
more
1 parent 1b4462b commit 0a91e70

File tree

5 files changed

+82
-27
lines changed

5 files changed

+82
-27
lines changed

felix/collector/goldmane/client.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ type GoldmaneReporter struct {
4141
once sync.Once
4242

4343
// Fields related to goldmane unix socket
44-
maySendToNodeSocket bool
45-
nodeClient *client.FlowClient
46-
nodeClientLock sync.RWMutex
44+
mayReportToNodeSocket bool
45+
nodeClient *client.FlowClient
46+
nodeClientLock sync.RWMutex
4747
}
4848

4949
func NewReporter(addr, cert, key, ca string) (*GoldmaneReporter, error) {
@@ -56,7 +56,7 @@ func NewReporter(addr, cert, key, ca string) (*GoldmaneReporter, error) {
5656
client: cli,
5757

5858
// Do not send flowlogs to node socket, if goldmane address set via FelixConfig is equal to node socket
59-
maySendToNodeSocket: addr != LocalGoldmaneServer,
59+
mayReportToNodeSocket: addr != LocalGoldmaneServer,
6060
}, nil
6161
}
6262

@@ -66,7 +66,7 @@ func (g *GoldmaneReporter) Start() error {
6666
// We don't wait for the initial connection to start so we don't block the caller.
6767
g.client.Connect(context.Background())
6868

69-
if g.maySendToNodeSocket {
69+
if g.mayReportToNodeSocket {
7070
go g.nodeSocketReporter()
7171
}
7272
})
@@ -97,6 +97,7 @@ func (g *GoldmaneReporter) nodeClientIsNil() bool {
9797
}
9898

9999
func (g *GoldmaneReporter) mayStartNodeSocketReporter() {
100+
// If node socket is already setup, do not try to set it up again.
100101
if !g.nodeClientIsNil() {
101102
return
102103
}
@@ -110,19 +111,20 @@ func (g *GoldmaneReporter) mayStartNodeSocketReporter() {
110111
logrus.WithError(err).Warn("Failed to create goldmane unix client")
111112
return
112113
}
113-
logrus.Info("Created goldmane unix client")
114+
logrus.Info("Created goldmane node client")
114115
g.nodeClient.Connect(context.Background())
115116
}
116117

117118
func (g *GoldmaneReporter) mayStopNodeSocketReporter() {
119+
// If node socket is already closed, do not try to close it again.
118120
if g.nodeClientIsNil() {
119121
return
120122
}
121123

122124
g.nodeClientLock.Lock()
123125
defer g.nodeClientLock.Unlock()
124126
g.nodeClient = nil
125-
logrus.Info("Destroyed goldmane unix client")
127+
logrus.Info("Destroyed goldmane node client")
126128
}
127129

128130
func (g *GoldmaneReporter) Report(logSlice any) error {
@@ -135,8 +137,8 @@ func (g *GoldmaneReporter) Report(logSlice any) error {
135137
goldmaneLog := convertFlowlogToGoldmane(l)
136138
g.client.Push(goldmaneLog)
137139

138-
if g.maySendToNodeSocket {
139-
// If goldmane local unix server exists, also send it flowlogs.
140+
if g.mayReportToNodeSocket {
141+
// If goldmane node server exists, also send flowlogs to it.
140142
g.nodeClientLock.RLock()
141143
if g.nodeClient != nil {
142144
g.nodeClient.Push(goldmaneLog)

felix/collector/goldmane/local_server.go renamed to felix/collector/goldmane/node_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (s *NodeServer) Run() error {
101101
}
102102

103103
func (s *NodeServer) Stop() {
104-
s.grpcServer.GracefulStop()
104+
s.grpcServer.Stop()
105105
}
106106

107107
func (s *NodeServer) List() []*types.Flow {

goldmane/pkg/types/flow.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,22 @@ func (k *FlowKey) DestPort() int64 {
122122
return k.dest.Value().DestPort
123123
}
124124

125+
func (k *FlowKey) DestServiceName() string {
126+
return k.dest.Value().DestServiceName
127+
}
128+
129+
func (k *FlowKey) DestServiceNamespace() string {
130+
return k.dest.Value().DestServiceNamespace
131+
}
132+
133+
func (k *FlowKey) DestServicePortName() string {
134+
return k.dest.Value().DestServicePortName
135+
}
136+
137+
func (k *FlowKey) DestServicePort() int64 {
138+
return k.dest.Value().DestServicePort
139+
}
140+
125141
// This struct should be an exact copy of the proto.Flow structure, but without the private fields.
126142
type Flow struct {
127143
Key *FlowKey

node/cmd/calico-node/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ var runStatusReporter = flagSet.Bool("status-reporter", false, "Run node status
7373
var showStatus = flagSet.Bool("show-status", false, "Print out node status")
7474

7575
// Options for node flowlogs.
76-
var watchFlowlogs = flagSet.Bool("watch-flowlogs", false, "Watch for node flowlogs")
76+
var fetchFlowlogs = flagSet.Int("flowlogs", 0, "Fetch a number of flowlogs. Use a negative value to watch forever.")
7777

7878
// confd flags
7979
var runConfd = flagSet.Bool("confd", false, "Run confd")
@@ -176,8 +176,8 @@ func main() {
176176
} else if *showStatus {
177177
status.Show()
178178
os.Exit(0)
179-
} else if *watchFlowlogs {
180-
flowlogs.StartServerAndWatch()
179+
} else if *fetchFlowlogs != 0 {
180+
flowlogs.StartServerAndWatch(*fetchFlowlogs)
181181
os.Exit(0)
182182
} else {
183183
fmt.Println("No valid options provided. Usage:")

node/pkg/flowlogs/flowlogs.go

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
// Copyright (c) 2025 Tigera, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
package flowlogs
216

317
import (
@@ -15,7 +29,7 @@ import (
1529
"github.com/sirupsen/logrus"
1630
)
1731

18-
func StartServerAndWatch() {
32+
func StartServerAndWatch(num int) {
1933
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
2034
defer stop()
2135

@@ -31,29 +45,44 @@ func StartServerAndWatch() {
3145
return
3246
}
3347

48+
infinitLoop := num < 0
49+
var count int
3450
for {
35-
if ctx.Err() != nil {
36-
logrus.Info("Closing goldmane unix server")
51+
if ctx.Err() != nil ||
52+
(!infinitLoop && count >= num) {
53+
logrus.Debug("Closing goldmane unix server")
54+
nodeServer.Stop()
3755
cleanupGoldmaneSocket()
3856
return
3957
}
58+
4059
flows := nodeServer.ListAndFlush()
4160
for _, flow := range flows {
42-
fmt.Printf("%s\n", flowToString(flow))
61+
fmt.Printf("%s", flowToString(flow))
4362
}
63+
count = count + len(flows)
4464
time.Sleep(time.Second)
4565
}
4666
}
4767

4868
func flowToString(f *types.Flow) string {
49-
output := fmt.Sprintf("Src={%s(%s/%s) %vP %vB} Dst={%s(%s/%s) %vP %vB} Proto=%s(%v) Action=%v",
50-
endpointTypeToString(f.Key.SourceType()), f.Key.SourceNamespace(), f.Key.SourceName(), f.PacketsIn, f.BytesIn,
51-
endpointTypeToString(f.Key.DestType()), f.Key.DestNamespace(), f.Key.DestName(), f.PacketsOut, f.BytesOut,
52-
f.Key.Proto(), f.Key.DestPort(),
53-
f.Key.Action(),
69+
startTime := time.Unix(f.StartTime, 0)
70+
policyTrace := types.FlowLogPolicyToProto(f.Key.Policies())
71+
return fmt.Sprintf(
72+
"- Time=%v Reporter=%v Action=%v\n"+
73+
" Src=%s(%s/%s) Dst=%s(%s/%s) Svc=%s/%s Proto=%s(%v svc:%s/%v)\n"+
74+
" Counts={Ingress: %vPkts/%vBytes Egress:%vPkts/%vBytes} Connections={Started:%v Completed:%v Live:%v}\n"+
75+
" Enforced:\n%v\n"+
76+
" Pending:\n%v\n",
77+
startTime, f.Key.Reporter(), f.Key.Action(),
78+
endpointTypeToString(f.Key.SourceType()), f.Key.SourceNamespace(), f.Key.SourceName(),
79+
endpointTypeToString(f.Key.DestType()), f.Key.DestNamespace(), f.Key.DestName(),
80+
f.Key.DestServiceName(), f.Key.DestServiceNamespace(),
81+
f.Key.Proto(), f.Key.DestPort(), f.Key.DestServicePortName(), f.Key.DestServicePort(),
82+
f.PacketsIn, f.BytesIn, f.PacketsOut, f.BytesOut,
83+
f.NumConnectionsStarted, f.NumConnectionsCompleted, f.NumConnectionsLive,
84+
policyHitsToString(policyTrace.EnforcedPolicies), policyHitsToString(policyTrace.PendingPolicies),
5485
)
55-
56-
return output
5786
}
5887

5988
func endpointTypeToString(ep proto.EndpointType) string {
@@ -71,17 +100,25 @@ func endpointTypeToString(ep proto.EndpointType) string {
71100
}
72101
}
73102

103+
func policyHitsToString(policies []*proto.PolicyHit) string {
104+
var out string
105+
for _, p := range policies {
106+
out = out + fmt.Sprintf(" - %v", p)
107+
}
108+
return out
109+
}
110+
74111
func ensureGoldmaneSocketDirectory(addr string) error {
75112
path := path.Dir(addr)
76113
// Check if goldmane unix server exists at the expected location.
77-
logrus.Info("Checking if goldmane unix server exists.")
114+
logrus.Debug("Checking if goldmane unix server exists.")
78115
if _, err := os.Stat(path); os.IsNotExist(err) {
79-
logrus.WithField("path", path).Info("Goldmane unix socket directory does not exist.")
116+
logrus.WithField("path", path).Debug("Goldmane unix socket directory does not exist.")
80117
err := os.MkdirAll(path, 0o600)
81118
if err != nil {
82119
return err
83120
}
84-
logrus.WithField("path", path).Info("Created goldmane unix server directory.")
121+
logrus.WithField("path", path).Debug("Created goldmane unix server directory.")
85122
}
86123
return nil
87124
}

0 commit comments

Comments
 (0)