Skip to content

Commit 791a7b6

Browse files
Merge pull request #10037 from caseydavenport/casey-daemon-fv
Add basic daemon FV
2 parents 4e322b1 + 304a5a6 commit 791a7b6

File tree

4 files changed

+289
-3
lines changed

4 files changed

+289
-3
lines changed

goldmane/fv/daemon_test.go

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
package fv
2+
3+
import (
4+
"context"
5+
"crypto/x509"
6+
"fmt"
7+
"net/http"
8+
"net/http/httptest"
9+
"os"
10+
"sync"
11+
"testing"
12+
"time"
13+
14+
. "github.com/onsi/gomega"
15+
"github.com/sirupsen/logrus"
16+
"github.com/stretchr/testify/require"
17+
"google.golang.org/grpc"
18+
19+
"github.com/projectcalico/calico/goldmane/pkg/client"
20+
"github.com/projectcalico/calico/goldmane/pkg/daemon"
21+
"github.com/projectcalico/calico/goldmane/pkg/testutils"
22+
"github.com/projectcalico/calico/goldmane/proto"
23+
"github.com/projectcalico/calico/lib/std/cryptoutils"
24+
"github.com/projectcalico/calico/libcalico-go/lib/logutils"
25+
)
26+
27+
var (
28+
ctx context.Context
29+
goldmaneURL string
30+
clientCA string
31+
clientCert string
32+
clientKey string
33+
34+
emitted *emissionCounter
35+
)
36+
37+
func daemonSetup(t *testing.T, cfg daemon.Config) func() {
38+
RegisterTestingT(t)
39+
logrus.SetLevel(logrus.DebugLevel)
40+
logutils.ConfigureFormatter("daemonfv")
41+
logCancel := logutils.RedirectLogrusToTestingT(t)
42+
43+
// The context acts as a global timeout for the test to make sure we don't hang.
44+
var cancel context.CancelFunc
45+
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
46+
47+
// Create TLS credentials for Goldmane.
48+
cert, key := createKeyCertPair(os.TempDir())
49+
50+
// Create TLS credentials for the client.
51+
cliCert, cliKey := createKeyCertPair(os.TempDir())
52+
53+
// Store the file paths for the client certificates.
54+
clientCA = cert.Name()
55+
clientKey = cliKey.Name()
56+
clientCert = cliCert.Name()
57+
58+
// Augment the configuration with the paths to the certificates.
59+
cfg.ServerCertPath = cert.Name()
60+
cfg.ServerKeyPath = key.Name()
61+
cfg.CACertPath = cliCert.Name()
62+
63+
// Start a test HTTP server that we can point the emitter at to verify
64+
// flows are being emitted.
65+
emitted = &emissionCounter{}
66+
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
67+
logrus.WithField("path", r.URL.Path).Info("[TEST] Received request")
68+
emitted.Inc()
69+
}))
70+
cfg.PushURL = testServer.URL
71+
72+
// Run the daemon.
73+
go daemon.Run(ctx, cfg)
74+
75+
goldmaneURL = fmt.Sprintf("localhost:%d", cfg.Port)
76+
77+
return func() {
78+
logCancel()
79+
cancel()
80+
}
81+
}
82+
83+
type emissionCounter struct {
84+
sync.Mutex
85+
flows int
86+
}
87+
88+
func (t *emissionCounter) Inc() {
89+
t.Lock()
90+
defer t.Unlock()
91+
t.flows++
92+
}
93+
94+
func (t *emissionCounter) Count() int {
95+
t.Lock()
96+
defer t.Unlock()
97+
return t.flows
98+
}
99+
100+
func createKeyCertPair(dir string) (*os.File, *os.File) {
101+
certPEM, keyPEM, err := cryptoutils.GenerateSelfSignedCert(
102+
cryptoutils.WithDNSNames("localhost"),
103+
cryptoutils.WithExtKeyUsages(x509.ExtKeyUsageAny),
104+
)
105+
Expect(err).ShouldNot(HaveOccurred())
106+
107+
certFile, err := os.CreateTemp(dir, "cert.pem")
108+
Expect(err).ShouldNot(HaveOccurred())
109+
defer certFile.Close()
110+
111+
keyFile, err := os.CreateTemp(dir, "key.pem")
112+
Expect(err).ShouldNot(HaveOccurred())
113+
defer keyFile.Close()
114+
115+
_, err = certFile.Write(certPEM)
116+
Expect(err).ShouldNot(HaveOccurred())
117+
_, err = keyFile.Write(keyPEM)
118+
Expect(err).ShouldNot(HaveOccurred())
119+
120+
return certFile, keyFile
121+
}
122+
123+
// TestDaemonCanary acts as a baseline test to ensure we can start the daemon and connect to it.
124+
// If this test fails, it likely means something fundamental is wrong.
125+
func TestDaemonCanary(t *testing.T) {
126+
cfg := daemon.Config{
127+
LogLevel: "debug",
128+
Port: 8988,
129+
AggregationWindow: time.Second * 1,
130+
EmitAfterSeconds: 2,
131+
EmitterAggregationWindow: time.Second * 2,
132+
}
133+
defer daemonSetup(t, cfg)()
134+
135+
// Generate credentials for the Goldmane client.
136+
creds, err := client.ClientCredentials(clientCert, clientKey, clientCA)
137+
if err != nil {
138+
logrus.WithError(err).Fatal("Failed to create goldmane TLS credentials.")
139+
}
140+
141+
// Verify we can connect to the server.
142+
cli, err := client.NewFlowsAPIClient(goldmaneURL, grpc.WithTransportCredentials(creds))
143+
require.NoError(t, err)
144+
require.NotNil(t, cli)
145+
146+
// Verify we can list flows.
147+
Eventually(func() error {
148+
_, err = cli.List(ctx, nil)
149+
return err
150+
}, 5*time.Second, 1*time.Second).Should(Succeed())
151+
}
152+
153+
// TestFlows tests that we can ingest flows, that they show up in List reqeusts, and that they
154+
// are emitted to the configured endpoint.
155+
func TestFlows(t *testing.T) {
156+
cfg := daemon.Config{
157+
LogLevel: "debug",
158+
Port: 8988,
159+
AggregationWindow: time.Second * 1,
160+
EmitAfterSeconds: 2,
161+
EmitterAggregationWindow: time.Second * 2,
162+
}
163+
defer daemonSetup(t, cfg)()
164+
165+
// Generate credentials for the Goldmane client.
166+
creds, err := client.ClientCredentials(clientCert, clientKey, clientCA)
167+
if err != nil {
168+
logrus.WithError(err).Fatal("Failed to create goldmane TLS credentials.")
169+
}
170+
171+
// Create a client to interact with Flows.
172+
cli, err := client.NewFlowsAPIClient("localhost:8988", grpc.WithTransportCredentials(creds))
173+
require.NoError(t, err)
174+
require.NotNil(t, cli)
175+
176+
// Create a client to pusher Flows.
177+
pusher, err := client.NewFlowClient(goldmaneURL, clientCert, clientKey, clientCA)
178+
require.NoError(t, err)
179+
180+
connected := pusher.Connect(ctx)
181+
require.NoError(t, err)
182+
Eventually(connected, 5*time.Second, 100*time.Millisecond).Should(BeClosed())
183+
184+
// Start a goroutine to continuously send flows.
185+
go func(ctx context.Context) {
186+
for {
187+
if ctx.Err() != nil {
188+
return
189+
}
190+
f := testutils.NewRandomFlow(time.Now().Unix())
191+
pusher.Push(f)
192+
time.Sleep(100 * time.Millisecond)
193+
}
194+
}(ctx)
195+
196+
// Verify we can list flows.
197+
var flows []*proto.FlowResult
198+
Eventually(func() error {
199+
flows, err = cli.List(ctx, nil)
200+
if err != nil {
201+
return err
202+
}
203+
if len(flows) == 0 {
204+
return fmt.Errorf("no flows returned")
205+
}
206+
return nil
207+
}, 5*time.Second, 1*time.Second).Should(Succeed())
208+
209+
// We should eventually see flows emitted.
210+
// Sincse we only emit after 2 seconds with an emitter aggregation window of 2 seconds, we
211+
// should see at least one flow emitted after 4 seconds. We'll wait for 10 seconds to be sure.
212+
Eventually(emitted.Count, 10*time.Second, 1*time.Second).Should(BeNumerically(">", 0))
213+
}
214+
215+
// TestHints tests that we can successfully retrieve hints from generated flows.
216+
func TestHints(t *testing.T) {
217+
cfg := daemon.Config{
218+
LogLevel: "debug",
219+
Port: 8988,
220+
AggregationWindow: time.Second * 1,
221+
EmitAfterSeconds: 2,
222+
EmitterAggregationWindow: time.Second * 2,
223+
}
224+
defer daemonSetup(t, cfg)()
225+
226+
// Generate credentials for the Goldmane client.
227+
creds, err := client.ClientCredentials(clientCert, clientKey, clientCA)
228+
if err != nil {
229+
logrus.WithError(err).Fatal("Failed to create goldmane TLS credentials.")
230+
}
231+
232+
// Create a client to interact with Flows.
233+
cli, err := client.NewFlowsAPIClient("localhost:8988", grpc.WithTransportCredentials(creds))
234+
require.NoError(t, err)
235+
require.NotNil(t, cli)
236+
237+
// Create a client to pusher Flows.
238+
pusher, err := client.NewFlowClient(goldmaneURL, clientCert, clientKey, clientCA)
239+
require.NoError(t, err)
240+
241+
connected := pusher.Connect(ctx)
242+
require.NoError(t, err)
243+
Eventually(connected, 5*time.Second, 100*time.Millisecond).Should(BeClosed())
244+
245+
// Start a goroutine to continuously send flows.
246+
go func(ctx context.Context) {
247+
for {
248+
if ctx.Err() != nil {
249+
return
250+
}
251+
f := testutils.NewRandomFlow(time.Now().Unix())
252+
pusher.Push(f)
253+
time.Sleep(100 * time.Millisecond)
254+
}
255+
}(ctx)
256+
257+
// Verify we can list flows.
258+
var hints []*proto.FilterHint
259+
req := &proto.FilterHintsRequest{
260+
Type: proto.FilterType_FilterTypeDestNamespace,
261+
}
262+
Eventually(func() error {
263+
hints, err = cli.FiltersHints(ctx, req)
264+
if err != nil {
265+
return err
266+
}
267+
if len(hints) == 0 {
268+
return fmt.Errorf("no hints returned")
269+
}
270+
return nil
271+
}, 5*time.Second, 1*time.Second).Should(Succeed())
272+
}

goldmane/pkg/aggregator/bucketing/bucket_ring.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,11 @@ func (r *BucketRing) EmitFlowCollections(sink Sink) {
329329
for i := len(collections) - 1; i >= 0; i-- {
330330
c := collections[i]
331331
if len(c.Flows) > 0 {
332+
logrus.WithFields(logrus.Fields{
333+
"start": c.StartTime,
334+
"end": c.EndTime,
335+
"num": len(c.Flows),
336+
}).Debug("Emitting flow collection")
332337
sink.Receive(c)
333338
c.Complete()
334339
}

goldmane/pkg/client/tls.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ package client
1616
import (
1717
"crypto/tls"
1818
"crypto/x509"
19+
"fmt"
1920
"os"
2021

22+
"github.com/sirupsen/logrus"
2123
"google.golang.org/grpc/credentials"
2224
)
2325

@@ -34,15 +36,20 @@ func ClientCredentials(cert, key, ca string) (credentials.TransportCredentials,
3436

3537
func tlsConfig(cert, key, caFile string) (*tls.Config, error) {
3638
// Load client cert.
39+
logrus.WithFields(logrus.Fields{
40+
"cert": cert,
41+
"key": key,
42+
}).Debug("Loading client cert and key")
3743
certificate, err := tls.LoadX509KeyPair(cert, key)
3844
if err != nil {
39-
return nil, err
45+
return nil, fmt.Errorf("failed to load keypair: %s", err)
4046
}
4147

4248
// Load CA cert.
49+
logrus.WithField("ca", caFile).Debug("Loading CA cert")
4350
caCert, err := os.ReadFile(caFile)
4451
if err != nil {
45-
return nil, err
52+
return nil, fmt.Errorf("failed to load CA: %s", err)
4653
}
4754
caCertPool := x509.NewCertPool()
4855
caCertPool.AppendCertsFromPEM(caCert)

goldmane/pkg/emitter/emitter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ func (e *Emitter) Run(ctx context.Context) {
111111
defer e.q.ShutDown()
112112
select {
113113
case <-ctx.Done():
114+
logrus.Info("Context cancelled, shutting down emitter.")
114115
case <-done:
116+
logrus.Info("Emitter shutting down.")
115117
}
116118
}()
117119

@@ -130,7 +132,7 @@ func (e *Emitter) Run(ctx context.Context) {
130132
// Get pending work from the queue.
131133
key, quit := e.q.Get()
132134
if quit {
133-
logrus.WithField("cm", configMapKey).Info("Emitter shutting down.")
135+
logrus.Info("Emitter queue completed")
134136
return
135137
}
136138
e.q.Done(key)

0 commit comments

Comments
 (0)