From b63744257dfca3f89dfd79cfec22f311ba864f13 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 1 Apr 2025 10:42:49 -0400 Subject: [PATCH 1/4] Add reusable Pinger controller to the pinger module ...that starts and stops Pinger instances to be shared by the cable engine and route agent health checkers. Signed-off-by: Tom Pantelis --- pkg/pinger/controller.go | 139 ++++++++++++++++++++++ pkg/pinger/controller_test.go | 198 ++++++++++++++++++++++++++++++++ pkg/pinger/pinger_suite_test.go | 9 ++ 3 files changed, 346 insertions(+) create mode 100644 pkg/pinger/controller.go create mode 100644 pkg/pinger/controller_test.go diff --git a/pkg/pinger/controller.go b/pkg/pinger/controller.go new file mode 100644 index 000000000..bd2d86a36 --- /dev/null +++ b/pkg/pinger/controller.go @@ -0,0 +1,139 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pinger + +import ( + "sync" + "time" + + "github.com/pkg/errors" + "github.com/submariner-io/admiral/pkg/log" + submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + k8snet "k8s.io/utils/net" +) + +type Controller interface { + EndpointCreatedOrUpdated(spec *submarinerv1.EndpointSpec) + EndpointRemoved(spec *submarinerv1.EndpointSpec) + Get(spec *submarinerv1.EndpointSpec, family k8snet.IPFamily) Interface + Stop() +} + +type ControllerConfig struct { + SupportedIPFamilies []k8snet.IPFamily + PingInterval int + MaxPacketLossCount int + NewPinger func(Config) Interface +} + +type controller struct { + ControllerConfig + sync.RWMutex + pingers map[string]Interface +} + +func NewController(config ControllerConfig) Controller { + if len(config.SupportedIPFamilies) == 0 { + panic(errors.New("SupportedIPFamilies must not be empty")) + } + + return &controller{ + ControllerConfig: config, + pingers: map[string]Interface{}, + } +} + +func (c *controller) EndpointCreatedOrUpdated(spec *submarinerv1.EndpointSpec) { + c.Lock() + defer c.Unlock() + + for _, family := range c.SupportedIPFamilies { + healthCheckIP := spec.GetHealthCheckIP(family) + if healthCheckIP == "" { + logger.Infof("IPv%v HealthCheckIP for Endpoint %q is empty - will not monitor endpoint health", + family, spec.CableName) + continue + } + + c.startPinger(spec.GetFamilyCableName(family), healthCheckIP) + } +} + +func (c *controller) startPinger(familyCableName, healthCheckIP string) { + if pingerObject, found := c.pingers[familyCableName]; found { + if pingerObject.GetIP() == healthCheckIP { + return + } + + logger.V(log.DEBUG).Infof("HealthChecker is already running for %q - stopping", familyCableName) + pingerObject.Stop() + delete(c.pingers, familyCableName) + } + + pingerConfig := Config{ + IP: healthCheckIP, + MaxPacketLossCount: c.MaxPacketLossCount, + } + + if c.PingInterval != 0 { + pingerConfig.Interval = time.Second * time.Duration(c.PingInterval) + } + + newPingerFunc := c.NewPinger + if newPingerFunc == nil { + newPingerFunc = NewPinger + } + + pingerObject := newPingerFunc(pingerConfig) + c.pingers[familyCableName] = pingerObject + pingerObject.Start() + + logger.Infof("HealthChecker started pinger for CableName: %q with HealthCheckIP %q", familyCableName, healthCheckIP) +} + +func (c *controller) EndpointRemoved(spec *submarinerv1.EndpointSpec) { + c.Lock() + defer c.Unlock() + + for _, family := range c.SupportedIPFamilies { + familyCableName := spec.GetFamilyCableName(family) + if pingerObject, found := c.pingers[familyCableName]; found { + pingerObject.Stop() + delete(c.pingers, familyCableName) + } + } +} + +func (c *controller) Get(spec *submarinerv1.EndpointSpec, family k8snet.IPFamily) Interface { + c.Lock() + defer c.Unlock() + + return c.pingers[spec.GetFamilyCableName(family)] +} + +func (c *controller) Stop() { + c.Lock() + defer c.Unlock() + + for _, p := range c.pingers { + p.Stop() + } + + c.pingers = map[string]Interface{} +} diff --git a/pkg/pinger/controller_test.go b/pkg/pinger/controller_test.go new file mode 100644 index 000000000..60b1936d1 --- /dev/null +++ b/pkg/pinger/controller_test.go @@ -0,0 +1,198 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pinger_test + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + "github.com/submariner-io/submariner/pkg/pinger" + "github.com/submariner-io/submariner/pkg/pinger/fake" + k8snet "k8s.io/utils/net" +) + +var _ = Describe("Controller", func() { + const ( + remoteClusterID1 = "west" + remoteClusterID2 = "north" + healthCheckIP1 = "1.1.1.1" + healthCheckIP2 = "2.2.2.2" + healthCheckIP3 = "3.3.3.3" + pingInterval = 3 + maxPacketLossCount = 4 + ) + + var ( + controller pinger.Controller + supportedIPFamilies []k8snet.IPFamily + pingerMap map[string]*fake.Pinger + ) + + BeforeEach(func() { + supportedIPFamilies = []k8snet.IPFamily{k8snet.IPv4} + pingerMap = map[string]*fake.Pinger{ + healthCheckIP1: fake.NewPinger(healthCheckIP1), + healthCheckIP2: fake.NewPinger(healthCheckIP2), + } + }) + + JustBeforeEach(func() { + controller = pinger.NewController(pinger.ControllerConfig{ + SupportedIPFamilies: supportedIPFamilies, + PingInterval: pingInterval, + MaxPacketLossCount: maxPacketLossCount, + NewPinger: func(pingerCfg pinger.Config) pinger.Interface { + defer GinkgoRecover() + Expect(pingerCfg.Interval).To(Equal(time.Second * time.Duration(pingInterval))) + Expect(pingerCfg.MaxPacketLossCount).To(Equal(maxPacketLossCount)) + + p, ok := pingerMap[pingerCfg.IP] + Expect(ok).To(BeTrue()) + return p + }, + }) + }) + + When("Endpoints are created/removed", func() { + It("should start/stop Pingers", func() { + endpoint1 := newEndpointSpec(remoteClusterID1, healthCheckIP1) + controller.EndpointCreatedOrUpdated(endpoint1) + + pingerMap[healthCheckIP1].AwaitStart() + Expect(controller.Get(endpoint1, k8snet.IPv4)).To(Equal(pingerMap[healthCheckIP1])) + + endpoint2 := newEndpointSpec(remoteClusterID2, healthCheckIP2) + controller.EndpointCreatedOrUpdated(endpoint2) + + pingerMap[healthCheckIP2].AwaitStart() + Expect(controller.Get(endpoint2, k8snet.IPv4)).To(Equal(pingerMap[healthCheckIP2])) + + By("Removing Endpoints") + + controller.EndpointRemoved(endpoint1) + + pingerMap[healthCheckIP1].AwaitStop() + Expect(controller.Get(endpoint1, k8snet.IPv4)).To(BeNil()) + + controller.EndpointRemoved(endpoint2) + + pingerMap[healthCheckIP2].AwaitStop() + Expect(controller.Get(endpoint2, k8snet.IPv4)).To(BeNil()) + }) + }) + + When("an Endpoint is created/removed with dual-stack health check IPs", func() { + const healthCheckIPv6 = "2001:db8:3333:4444:5555:6666:7777:8888" + + BeforeEach(func() { + supportedIPFamilies = []k8snet.IPFamily{k8snet.IPv4, k8snet.IPv6} + pingerMap[healthCheckIPv6] = fake.NewPinger(healthCheckIPv6) + }) + + It("should start/stop Pingers", func() { + endpoint := newEndpointSpec(remoteClusterID1, healthCheckIP1, healthCheckIPv6) + controller.EndpointCreatedOrUpdated(endpoint) + + pingerMap[healthCheckIP1].AwaitStart() + pingerMap[healthCheckIPv6].AwaitStart() + + Expect(controller.Get(endpoint, k8snet.IPv4)).To(Equal(pingerMap[healthCheckIP1])) + Expect(controller.Get(endpoint, k8snet.IPv6)).To(Equal(pingerMap[healthCheckIPv6])) + + By("Removing Endpoint") + + controller.EndpointRemoved(endpoint) + + pingerMap[healthCheckIP1].AwaitStop() + Expect(controller.Get(endpoint, k8snet.IPv4)).To(BeNil()) + + pingerMap[healthCheckIPv6].AwaitStop() + Expect(controller.Get(endpoint, k8snet.IPv6)).To(BeNil()) + }) + }) + + When("an Endpoint is updated", func() { + var endpoint *submarinerv1.EndpointSpec + + JustBeforeEach(func() { + endpoint = newEndpointSpec(remoteClusterID1, healthCheckIP1) + controller.EndpointCreatedOrUpdated(endpoint) + pingerMap[healthCheckIP1].AwaitStart() + }) + + When("the HealthCheckIP was changed", func() { + BeforeEach(func() { + pingerMap[healthCheckIP3] = fake.NewPinger(healthCheckIP3) + }) + + It("should stop the Pinger and start a new one", func() { + endpoint.HealthCheckIPs = []string{healthCheckIP3} + + controller.EndpointCreatedOrUpdated(endpoint) + pingerMap[healthCheckIP1].AwaitStop() + pingerMap[healthCheckIP3].AwaitStart() + }) + }) + + When("the HealthCheckIP did not changed", func() { + It("should not start a new Pinger", func() { + controller.EndpointCreatedOrUpdated(endpoint) + pingerMap[healthCheckIP1].AwaitNoStop() + }) + }) + }) + + When("an Endpoint has no HealthCheckIP", func() { + It("should not start a Pinger", func() { + controller.EndpointCreatedOrUpdated(newEndpointSpec(remoteClusterID1, "")) + pingerMap[healthCheckIP1].AwaitNoStart() + }) + }) + + When("no supported IP families are provided", func() { + It("should panic", func() { + Expect(func() { + pinger.NewController(pinger.ControllerConfig{}) + }).To(Panic()) + }) + }) + + Specify("Stop should stop all pingers", func() { + controller.EndpointCreatedOrUpdated(newEndpointSpec(remoteClusterID1, healthCheckIP1)) + pingerMap[healthCheckIP1].AwaitStart() + + controller.EndpointCreatedOrUpdated(newEndpointSpec(remoteClusterID2, healthCheckIP2)) + pingerMap[healthCheckIP2].AwaitStart() + + controller.Stop() + pingerMap[healthCheckIP1].AwaitStop() + pingerMap[healthCheckIP2].AwaitStop() + }) +}) + +func newEndpointSpec(clusterID string, healthCheckIPs ...string) *submarinerv1.EndpointSpec { + return &submarinerv1.EndpointSpec{ + ClusterID: clusterID, + CableName: fmt.Sprintf("submariner-cable-%s-192-68-1-20", clusterID), + HealthCheckIPs: healthCheckIPs, + } +} diff --git a/pkg/pinger/pinger_suite_test.go b/pkg/pinger/pinger_suite_test.go index 59ba7dcd1..7eb616d63 100644 --- a/pkg/pinger/pinger_suite_test.go +++ b/pkg/pinger/pinger_suite_test.go @@ -23,8 +23,17 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/submariner-io/admiral/pkg/log/kzerolog" ) +func init() { + kzerolog.AddFlags(nil) +} + +var _ = BeforeSuite(func() { + kzerolog.InitK8sLogging() +}) + func TestPinger(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Pinger Suite") From 404cfa06e841b473882d4a29f45b29ca1e3a559b Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 1 Apr 2025 10:50:56 -0400 Subject: [PATCH 2/4] Use pinger Controller in cable engine health checker Signed-off-by: Tom Pantelis --- .../healthchecker/healthchecker.go | 118 ++++-------------- .../healthchecker/healthchecker_suite_test.go | 3 + .../healthchecker/healthchecker_test.go | 107 +++++----------- pkg/cableengine/syncer/syncer_test.go | 20 +-- pkg/gateway/gateway.go | 17 +-- 5 files changed, 76 insertions(+), 189 deletions(-) diff --git a/pkg/cableengine/healthchecker/healthchecker.go b/pkg/cableengine/healthchecker/healthchecker.go index dadc0bf92..8e2b42b7f 100644 --- a/pkg/cableengine/healthchecker/healthchecker.go +++ b/pkg/cableengine/healthchecker/healthchecker.go @@ -19,9 +19,6 @@ limitations under the License. package healthchecker import ( - "sync" - "time" - "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" "github.com/submariner-io/admiral/pkg/watcher" @@ -39,62 +36,52 @@ type Interface interface { } type Config struct { - WatcherConfig *watcher.Config - SupportedIPFamilies []k8snet.IPFamily - EndpointNamespace string - ClusterID string - PingInterval int - MaxPacketLossCount int - NewPinger func(pinger.Config) pinger.Interface + pinger.ControllerConfig + WatcherConfig watcher.Config + EndpointNamespace string + ClusterID string } type controller struct { - sync.RWMutex - pingers map[string]pinger.Interface - config *Config + config Config + pingerController pinger.Controller } var logger = log.Logger{Logger: logf.Log.WithName("HealthChecker")} func New(config *Config) (Interface, error) { - if len(config.SupportedIPFamilies) == 0 { - return nil, errors.New("SupportedIPFamilies must not be empty") - } - - controller := &controller{ - config: config, - pingers: map[string]pinger.Interface{}, + c := &controller{ + config: *config, + pingerController: pinger.NewController(config.ControllerConfig), } - config.WatcherConfig.ResourceConfigs = []watcher.ResourceConfig{ + c.config.WatcherConfig.ResourceConfigs = []watcher.ResourceConfig{ { Name: "HealthChecker Endpoint Controller", ResourceType: &submarinerv1.Endpoint{}, Handler: watcher.EventHandlerFuncs{ - OnCreateFunc: controller.endpointCreatedOrUpdated, - OnUpdateFunc: controller.endpointCreatedOrUpdated, - OnDeleteFunc: controller.endpointDeleted, + OnCreateFunc: c.endpointCreatedOrUpdated, + OnUpdateFunc: c.endpointCreatedOrUpdated, + OnDeleteFunc: c.endpointDeleted, }, SourceNamespace: config.EndpointNamespace, }, } - return controller, nil + return c, nil } func (h *controller) GetLatencyInfo(endpoint *submarinerv1.EndpointSpec, ipFamily k8snet.IPFamily) *pinger.LatencyInfo { - h.RLock() - defer h.RUnlock() - - if pingerObject, found := h.pingers[endpoint.GetFamilyCableName(ipFamily)]; found { - return pingerObject.GetLatencyInfo() + pinger := h.pingerController.Get(endpoint, ipFamily) + if pinger != nil { + return pinger.GetLatencyInfo() } return nil } func (h *controller) Start(stopCh <-chan struct{}) error { - endpointWatcher, err := watcher.New(h.config.WatcherConfig) + endpointWatcher, err := watcher.New(&h.config.WatcherConfig) if err != nil { return errors.Wrapf(err, "error creating watcher") } @@ -110,14 +97,7 @@ func (h *controller) Start(stopCh <-chan struct{}) error { } func (h *controller) Stop() { - h.Lock() - defer h.Unlock() - - for _, p := range h.pingers { - p.Stop() - } - - h.pingers = map[string]pinger.Interface{} + h.pingerController.Stop() } func (h *controller) endpointCreatedOrUpdated(obj runtime.Object, _ int) bool { @@ -128,68 +108,12 @@ func (h *controller) endpointCreatedOrUpdated(obj runtime.Object, _ int) bool { return false } - h.Lock() - defer h.Unlock() - - for _, family := range h.config.SupportedIPFamilies { - healthCheckIP := endpointCreated.Spec.GetHealthCheckIP(family) - if healthCheckIP == "" { - logger.Infof("IPv%v HealthCheckIP for Endpoint %q is empty - will not monitor endpoint health", - family, endpointCreated.Name) - continue - } - - h.startPinger(endpointCreated.Spec.GetFamilyCableName(family), healthCheckIP) - } + h.pingerController.EndpointCreatedOrUpdated(&endpointCreated.Spec) return false } -func (h *controller) startPinger(familyCableName, healthCheckIP string) { - if pingerObject, found := h.pingers[familyCableName]; found { - if pingerObject.GetIP() == healthCheckIP { - return - } - - logger.V(log.DEBUG).Infof("HealthChecker is already running for %q - stopping", familyCableName) - pingerObject.Stop() - delete(h.pingers, familyCableName) - } - - pingerConfig := pinger.Config{ - IP: healthCheckIP, - MaxPacketLossCount: h.config.MaxPacketLossCount, - } - - if h.config.PingInterval != 0 { - pingerConfig.Interval = time.Second * time.Duration(h.config.PingInterval) - } - - newPingerFunc := h.config.NewPinger - if newPingerFunc == nil { - newPingerFunc = pinger.NewPinger - } - - pingerObject := newPingerFunc(pingerConfig) - h.pingers[familyCableName] = pingerObject - pingerObject.Start() - - logger.Infof("CableEngine HealthChecker started pinger for CableName: %q with HealthCheckIP %q", familyCableName, healthCheckIP) -} - func (h *controller) endpointDeleted(obj runtime.Object, _ int) bool { - endpointDeleted := obj.(*submarinerv1.Endpoint) - - h.Lock() - defer h.Unlock() - - for _, family := range h.config.SupportedIPFamilies { - familyCableName := endpointDeleted.Spec.GetFamilyCableName(family) - if pingerObject, found := h.pingers[familyCableName]; found { - pingerObject.Stop() - delete(h.pingers, familyCableName) - } - } - + h.pingerController.EndpointRemoved(&(obj.(*submarinerv1.Endpoint)).Spec) return false } diff --git a/pkg/cableengine/healthchecker/healthchecker_suite_test.go b/pkg/cableengine/healthchecker/healthchecker_suite_test.go index 010539710..af4a5c989 100644 --- a/pkg/cableengine/healthchecker/healthchecker_suite_test.go +++ b/pkg/cableengine/healthchecker/healthchecker_suite_test.go @@ -24,6 +24,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/submariner-io/admiral/pkg/log/kzerolog" + submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + kubeScheme "k8s.io/client-go/kubernetes/scheme" ) func init() { @@ -32,6 +34,7 @@ func init() { var _ = BeforeSuite(func() { kzerolog.InitK8sLogging() + Expect(submarinerv1.AddToScheme(kubeScheme.Scheme)).To(Succeed()) }) func TestHealthChecker(t *testing.T) { diff --git a/pkg/cableengine/healthchecker/healthchecker_test.go b/pkg/cableengine/healthchecker/healthchecker_test.go index 9630fb441..06b227fba 100644 --- a/pkg/cableengine/healthchecker/healthchecker_test.go +++ b/pkg/cableengine/healthchecker/healthchecker_test.go @@ -21,7 +21,6 @@ package healthchecker_test import ( "context" "fmt" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -32,7 +31,6 @@ import ( "github.com/submariner-io/submariner/pkg/pinger" "github.com/submariner-io/submariner/pkg/pinger/fake" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" fakeClient "k8s.io/client-go/dynamic/fake" kubeScheme "k8s.io/client-go/kubernetes/scheme" @@ -54,7 +52,6 @@ var _ = Describe("Controller", func() { endpoints dynamic.ResourceInterface pingerMap map[string]*fake.Pinger stopCh chan struct{} - checkInstantiation func(error) ) BeforeEach(func() { @@ -63,50 +60,40 @@ var _ = Describe("Controller", func() { healthCheckIP1: fake.NewPinger(healthCheckIP1), healthCheckIP2: fake.NewPinger(healthCheckIP2), } - - checkInstantiation = func(err error) { - Expect(err).To(Succeed()) - Expect(healthChecker.Start(stopCh)).To(Succeed()) - } }) JustBeforeEach(func() { stopCh = make(chan struct{}) - scheme := runtime.NewScheme() - Expect(submarinerv1.AddToScheme(scheme)).To(Succeed()) - Expect(submarinerv1.AddToScheme(kubeScheme.Scheme)).To(Succeed()) - dynamicClient := fakeClient.NewSimpleDynamicClient(scheme) + dynamicClient := fakeClient.NewSimpleDynamicClient(kubeScheme.Scheme) restMapper := test.GetRESTMapperFor(&submarinerv1.Endpoint{}) endpoints = dynamicClient.Resource(*test.GetGroupVersionResourceFor(restMapper, &submarinerv1.Endpoint{})).Namespace(namespace) var err error config := &healthchecker.Config{ - WatcherConfig: &watcher.Config{ + ControllerConfig: pinger.ControllerConfig{ + SupportedIPFamilies: supportedIPFamilies, + PingInterval: 3, + MaxPacketLossCount: 4, + NewPinger: func(pingerCfg pinger.Config) pinger.Interface { + p, ok := pingerMap[pingerCfg.IP] + Expect(ok).To(BeTrue()) + + return p + }, + }, + WatcherConfig: watcher.Config{ RestMapper: restMapper, Client: dynamicClient, - Scheme: scheme, }, - SupportedIPFamilies: supportedIPFamilies, - EndpointNamespace: namespace, - ClusterID: localClusterID, - PingInterval: 3, - MaxPacketLossCount: 4, - } - - config.NewPinger = func(pingerCfg pinger.Config) pinger.Interface { - defer GinkgoRecover() - Expect(pingerCfg.Interval).To(Equal(time.Second * time.Duration(config.PingInterval))) - Expect(pingerCfg.MaxPacketLossCount).To(Equal(config.MaxPacketLossCount)) - - p, ok := pingerMap[pingerCfg.IP] - Expect(ok).To(BeTrue()) - return p + EndpointNamespace: namespace, + ClusterID: localClusterID, } healthChecker, err = healthchecker.New(config) - checkInstantiation(err) + Expect(err).ToNot(HaveOccurred()) + Expect(healthChecker.Start(stopCh)).To(Succeed()) }) AfterEach(func() { @@ -280,60 +267,30 @@ var _ = Describe("Controller", func() { }) }) - When("a remote Endpoint is updated", func() { + When("a remote Endpoint is updated and the HealthCheckIP was changed", func() { var endpoint *submarinerv1.Endpoint + BeforeEach(func() { + pingerMap[healthCheckIP3] = fake.NewPinger(healthCheckIP3) + }) + JustBeforeEach(func() { endpoint = createEndpoint(remoteClusterID1, healthCheckIP1) pingerMap[healthCheckIP1].AwaitStart() }) - When("the HealthCheckIP was changed", func() { - BeforeEach(func() { - pingerMap[healthCheckIP3] = fake.NewPinger(healthCheckIP3) - }) - - It("should stop the Pinger and start a new one", func() { - endpoint.Spec.HealthCheckIPs = []string{healthCheckIP3} - - test.UpdateResource(endpoints, endpoint) - pingerMap[healthCheckIP1].AwaitStop() - pingerMap[healthCheckIP3].AwaitStart() - - latencyInfo := newLatencyInfo() - pingerMap[healthCheckIP3].SetLatencyInfo(latencyInfo) - Eventually(func() *pinger.LatencyInfo { - return healthChecker.GetLatencyInfo(&endpoint.Spec, k8snet.IPv4) - }).Should(Equal(latencyInfo)) - }) - }) - - When("the HealthCheckIP did not changed", func() { - It("should not start a new Pinger", func() { - endpoint.Spec.Hostname = "raiders" + It("should stop the Pinger and start a new one", func() { + endpoint.Spec.HealthCheckIPs = []string{healthCheckIP3} - test.UpdateResource(endpoints, endpoint) - pingerMap[healthCheckIP1].AwaitNoStop() - }) - }) - }) - - When("a remote Endpoint has no HealthCheckIP", func() { - It("should not start a Pinger", func() { - createEndpoint(remoteClusterID1, "") - pingerMap[healthCheckIP1].AwaitNoStart() - }) - }) - - When("no supported IP families are provided", func() { - BeforeEach(func() { - supportedIPFamilies = nil - checkInstantiation = func(err error) { - Expect(err).To(HaveOccurred()) - } - }) + test.UpdateResource(endpoints, endpoint) + pingerMap[healthCheckIP1].AwaitStop() + pingerMap[healthCheckIP3].AwaitStart() - Specify("the health checker should fail", func() { + latencyInfo := newLatencyInfo() + pingerMap[healthCheckIP3].SetLatencyInfo(latencyInfo) + Eventually(func() *pinger.LatencyInfo { + return healthChecker.GetLatencyInfo(&endpoint.Spec, k8snet.IPv4) + }).Should(Equal(latencyInfo)) }) }) }) diff --git a/pkg/cableengine/syncer/syncer_test.go b/pkg/cableengine/syncer/syncer_test.go index 20f9c7012..349160836 100644 --- a/pkg/cableengine/syncer/syncer_test.go +++ b/pkg/cableengine/syncer/syncer_test.go @@ -549,19 +549,21 @@ func (t *testDriver) run() { var err error t.healthChecker, err = healthchecker.New(&healthchecker.Config{ - WatcherConfig: &watcher.Config{ + ControllerConfig: pinger.ControllerConfig{ + SupportedIPFamilies: []k8snet.IPFamily{k8snet.IPv4}, + NewPinger: func(pingerCfg pinger.Config) pinger.Interface { + defer GinkgoRecover() + Expect(pingerCfg.IP).To(Equal(t.pinger.GetIP())) + return t.pinger + }, + }, + WatcherConfig: watcher.Config{ RestMapper: restMapper, Client: dynamicClient, Scheme: scheme, }, - SupportedIPFamilies: []k8snet.IPFamily{k8snet.IPv4}, - EndpointNamespace: namespace, - ClusterID: t.engine.LocalEndPoint.Spec.ClusterID, - NewPinger: func(pingerCfg pinger.Config) pinger.Interface { - defer GinkgoRecover() - Expect(pingerCfg.IP).To(Equal(t.pinger.GetIP())) - return t.pinger - }, + EndpointNamespace: namespace, + ClusterID: t.engine.LocalEndPoint.Spec.ClusterID, }) Expect(err).To(Succeed()) diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 828829761..5aa4157b0 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -42,6 +42,7 @@ import ( "github.com/submariner-io/submariner/pkg/controllers/tunnel" "github.com/submariner-io/submariner/pkg/endpoint" "github.com/submariner-io/submariner/pkg/natdiscovery" + "github.com/submariner-io/submariner/pkg/pinger" "github.com/submariner-io/submariner/pkg/pod" "github.com/submariner-io/submariner/pkg/types" "github.com/submariner-io/submariner/pkg/versions" @@ -397,15 +398,15 @@ func (g *gatewayType) initCableHealthChecker() { if !g.Spec.HealthCheckEnabled { logger.Info("The CableEngine HealthChecker is disabled") } else { - watcherConfig := g.WatcherConfig - g.cableHealthChecker, err = healthchecker.New(&healthchecker.Config{ - WatcherConfig: &watcherConfig, - SupportedIPFamilies: g.Spec.GetIPFamilies(), - EndpointNamespace: g.Spec.Namespace, - ClusterID: g.Spec.ClusterID, - PingInterval: g.Spec.HealthCheckInterval, - MaxPacketLossCount: g.Spec.HealthCheckMaxPacketLossCount, + ControllerConfig: pinger.ControllerConfig{ + SupportedIPFamilies: g.Spec.GetIPFamilies(), + PingInterval: g.Spec.HealthCheckInterval, + MaxPacketLossCount: g.Spec.HealthCheckMaxPacketLossCount, + }, + WatcherConfig: g.WatcherConfig, + EndpointNamespace: g.Spec.Namespace, + ClusterID: g.Spec.ClusterID, }) if err != nil { logger.Errorf(err, "Error creating healthChecker") From 2233eb3e032b1d8e7ade6fe9992d4f9a7fdcb619 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 1 Apr 2025 11:18:01 -0400 Subject: [PATCH 3/4] Use pinger Controller in route agent health checker Signed-off-by: Tom Pantelis --- .../handlers/handlers_test.go | 11 +- .../handlers/healthchecker/healthchecker.go | 120 ++++-------------- .../healthchecker/healthchecker_test.go | 58 +++------ pkg/routeagent_driver/main.go | 19 +-- 4 files changed, 63 insertions(+), 145 deletions(-) diff --git a/pkg/routeagent_driver/handlers/handlers_test.go b/pkg/routeagent_driver/handlers/handlers_test.go index 2b0ea7a1e..16cf2765d 100644 --- a/pkg/routeagent_driver/handlers/handlers_test.go +++ b/pkg/routeagent_driver/handlers/handlers_test.go @@ -39,6 +39,7 @@ import ( netlinkAPI "github.com/submariner-io/submariner/pkg/netlink" fakenetlink "github.com/submariner-io/submariner/pkg/netlink/fake" fakePF "github.com/submariner-io/submariner/pkg/packetfilter/fake" + "github.com/submariner-io/submariner/pkg/pinger" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/calico" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/healthchecker" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/kubeproxy" @@ -53,6 +54,7 @@ import ( fakek8s "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" + k8snet "k8s.io/utils/net" ) var _ = Describe("", func() { @@ -127,10 +129,11 @@ var _ = Describe("", func() { Expect(calicoHandler.Init(ctx)).To(Succeed()) healthCheckerHandler := healthchecker.New(&healthchecker.Config{ - PingInterval: 1, - MaxPacketLossCount: 1, - HealthCheckerEnabled: true, - RouteAgentUpdateInterval: 100 * time.Millisecond, + ControllerConfig: pinger.ControllerConfig{ + SupportedIPFamilies: []k8snet.IPFamily{k8snet.IPv4}, + }, + HealthCheckerEnabled: false, + RouteAgentUpdateInterval: time.Hour, }, submClient.SubmarinerV1().RouteAgents(testing.Namespace), "v1", "test-node") Expect(healthCheckerHandler.Init(ctx)).To(Succeed()) }) diff --git a/pkg/routeagent_driver/handlers/healthchecker/healthchecker.go b/pkg/routeagent_driver/handlers/healthchecker/healthchecker.go index fe7fb86df..1fd5965e8 100644 --- a/pkg/routeagent_driver/handlers/healthchecker/healthchecker.go +++ b/pkg/routeagent_driver/handlers/healthchecker/healthchecker.go @@ -41,53 +41,44 @@ import ( ) type Config struct { - PingInterval int - MaxPacketLossCount int + pinger.ControllerConfig HealthCheckerEnabled bool RouteAgentUpdateInterval time.Duration - NewPinger func(pinger.Config) pinger.Interface } type controller struct { event.HandlerBase - sync.Mutex - pingers map[string]pinger.Interface - localNodeName string - version string - config *Config - stopCh chan struct{} - client v1typed.RouteAgentInterface + pingerController pinger.Controller + localNodeName string + version string + config Config + stopCh chan struct{} + stopOnce sync.Once + client v1typed.RouteAgentInterface } var logger = log.Logger{Logger: logf.Log.WithName("HealthChecker")} func New(config *Config, client v1typed.RouteAgentInterface, version, nodeName string) event.Handler { - controller := &controller{ - pingers: map[string]pinger.Interface{}, - config: config, - version: version, - client: client, - stopCh: make(chan struct{}), - localNodeName: nodeName, + return &controller{ + pingerController: pinger.NewController(config.ControllerConfig), + config: *config, + version: version, + client: client, + stopCh: make(chan struct{}), + localNodeName: nodeName, } - - return controller } func (h *controller) Stop() error { - h.Lock() - defer h.Unlock() - - for _, p := range h.pingers { - p.Stop() - } - - h.pingers = map[string]pinger.Interface{} + h.pingerController.Stop() - if h.stopCh != nil { - close(h.stopCh) - h.stopCh = nil - } + h.stopOnce.Do(func() { + if h.stopCh != nil { + close(h.stopCh) + h.stopCh = nil + } + }) err := h.client.Delete(context.TODO(), h.localNodeName, metav1.DeleteOptions{}) @@ -103,9 +94,6 @@ func (h *controller) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) erro } func (h *controller) RemoteEndpointUpdated(endpoint *submarinerv1.Endpoint) error { - h.Lock() - defer h.Unlock() - if !h.config.HealthCheckerEnabled || h.State().IsOnGateway() { return nil } @@ -118,57 +106,13 @@ func (h *controller) RemoteEndpointUpdated(endpoint *submarinerv1.Endpoint) erro func (h *controller) processEndpointCreatedOrUpdated(endpoint *submarinerv1.Endpoint) { logger.Infof("Processing Endpoint: %#v", endpoint) - if endpoint.Spec.GetHealthCheckIP(k8snet.IPv4) == "" || endpoint.Spec.CableName == "" { - logger.Infof("HealthCheckIP (%q) and/or CableName (%q) for Endpoint %q empty - will not monitor endpoint health", - endpoint.Spec.GetHealthCheckIP(k8snet.IPv4), endpoint.Spec.CableName, endpoint.Name) - return - } - - if pingerObject, found := h.pingers[endpoint.Spec.CableName]; found { - if pingerObject.GetIP() == endpoint.Spec.GetHealthCheckIP(k8snet.IPv4) { - return - } - - logger.Infof("HealthChecker is already running for %q - stopping", endpoint.Name) - pingerObject.Stop() - delete(h.pingers, endpoint.Spec.CableName) - } - - pingerConfig := pinger.Config{ - IP: endpoint.Spec.GetHealthCheckIP(k8snet.IPv4), - } - - if h.config.PingInterval != 0 { - pingerConfig.Interval = time.Second * time.Duration(h.config.PingInterval) - } - - if h.config.MaxPacketLossCount != 0 { - pingerConfig.MaxPacketLossCount = h.config.MaxPacketLossCount - } - - newPingerFunc := h.config.NewPinger - if newPingerFunc == nil { - newPingerFunc = pinger.NewPinger - } - - pingerObject := newPingerFunc(pingerConfig) - h.pingers[endpoint.Spec.CableName] = pingerObject - pingerObject.Start() - - logger.Infof("HealthChecker started pinger for CableName: %q with HealthCheckIP %q", - endpoint.Spec.CableName, endpoint.Spec.GetHealthCheckIP(k8snet.IPv4)) + h.pingerController.EndpointCreatedOrUpdated(&endpoint.Spec) } func (h *controller) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error { logger.Infof("Processing removed Endpoint %q", endpoint.Spec.CableName) - h.Lock() - defer h.Unlock() - - if pingerObject, found := h.pingers[endpoint.Spec.CableName]; found { - pingerObject.Stop() - delete(h.pingers, endpoint.Spec.CableName) - } + h.pingerController.EndpointRemoved(&endpoint.Spec) return nil } @@ -181,9 +125,6 @@ func (h *controller) Init(_ context.Context) error { //nolint:contextcheck // Ignore "should pass the context parameter" go func() { wait.Until(func() { - h.Lock() - defer h.Unlock() - h.syncRouteAgentStatus() }, h.config.RouteAgentUpdateInterval, h.stopCh) }() @@ -193,9 +134,6 @@ func (h *controller) Init(_ context.Context) error { // TransitionToNonGateway is called once for each transition of the local node from Gateway to a non-Gateway. func (h *controller) TransitionToNonGateway() error { - h.Lock() - defer h.Unlock() - if h.config.HealthCheckerEnabled { remoteEndpoints := h.State().GetRemoteEndpoints() @@ -209,14 +147,8 @@ func (h *controller) TransitionToNonGateway() error { // TransitionToGateway is called once for each transition of the local node from non-Gateway to a Gateway. func (h *controller) TransitionToGateway() error { - h.Lock() - defer h.Unlock() - if h.config.HealthCheckerEnabled { - for i := range h.pingers { - h.pingers[i].Stop() - delete(h.pingers, i) - } + h.pingerController.Stop() } h.syncRouteAgentStatus() @@ -248,7 +180,7 @@ func (h *controller) syncRouteAgentStatus() { } else if h.State().IsOnGateway() { connectionStatus = submarinerv1.ConnectionNone statusMessage = "Health check is not performed on gateway nodes" - } else if pingerObject, found := h.pingers[remoteEndpoints[i].Spec.CableName]; found { + } else if pingerObject := h.pingerController.Get(&remoteEndpoints[i].Spec, k8snet.IPv4); pingerObject != nil { latencyInfo := pingerObject.GetLatencyInfo() if latencyInfo != nil { switch latencyInfo.ConnectionStatus { diff --git a/pkg/routeagent_driver/handlers/healthchecker/healthchecker_test.go b/pkg/routeagent_driver/handlers/healthchecker/healthchecker_test.go index 4fb28f309..17134e145 100644 --- a/pkg/routeagent_driver/handlers/healthchecker/healthchecker_test.go +++ b/pkg/routeagent_driver/handlers/healthchecker/healthchecker_test.go @@ -172,35 +172,17 @@ var _ = Describe("RemoteEndpoint latency info", func() { }) }) - When("a remote Endpoint is updated", func() { - Context("and the HealthCheckIP was changed", func() { - It("should stop the pinger and start a new one", func() { - endpoint1 := t.CreateEndpoint(t.newSubmEndpoint(healthCheckIP1)) - - t.pingerMap[healthCheckIP1].AwaitStart() - - endpoint1.Spec.HealthCheckIPs = []string{healthCheckIP2} - - t.UpdateEndpoint(endpoint1) - t.pingerMap[healthCheckIP1].AwaitStop() - t.pingerMap[healthCheckIP2].AwaitStart() - }) - }) - - Context("and the HealthCheckIP did not change", func() { - It("should not start a new pinger", func() { - endpoint1 := t.CreateEndpoint(t.newSubmEndpoint(healthCheckIP1)) - t.pingerMap[healthCheckIP1].AwaitStart() + When("a remote Endpoint is updated and the HealthCheckIP was changed", func() { + It("should stop the pinger and start a new one", func() { + endpoint1 := t.CreateEndpoint(t.newSubmEndpoint(healthCheckIP1)) - endpoint1.Spec.Hostname = "newHostName" - t.UpdateEndpoint(endpoint1) + t.pingerMap[healthCheckIP1].AwaitStart() - pingerObject, found := t.pingerMap[endpoint1.Spec.GetHealthCheckIP(k8snet.IPv4)] - Expect(found).To(BeTrue()) - Expect(pingerObject.GetIP()).To(Equal(healthCheckIP1)) + endpoint1.Spec.HealthCheckIPs = []string{healthCheckIP2} - t.pingerMap[healthCheckIP1].AwaitNoStop() - }) + t.UpdateEndpoint(endpoint1) + t.pingerMap[healthCheckIP1].AwaitStop() + t.pingerMap[healthCheckIP2].AwaitStart() }) }) @@ -291,22 +273,22 @@ func newTestDriver() *testDriver { JustBeforeEach(func() { config := &healthchecker.Config{ - PingInterval: 1, // Set interval to 1 second for faster testing - MaxPacketLossCount: 1, + ControllerConfig: pinger.ControllerConfig{ + SupportedIPFamilies: []k8snet.IPFamily{k8snet.IPv4}, + PingInterval: 1, // Set interval to 1 second for faster testing + MaxPacketLossCount: 1, + NewPinger: func(pingerCfg pinger.Config) pinger.Interface { + defer GinkgoRecover() + p, ok := t.pingerMap[pingerCfg.IP] + Expect(ok).To(BeTrue()) + + return p + }, + }, HealthCheckerEnabled: t.healthcheckerEnabled, RouteAgentUpdateInterval: 100 * time.Millisecond, } - config.NewPinger = func(pingerCfg pinger.Config) pinger.Interface { - defer GinkgoRecover() - Expect(pingerCfg.Interval).To(Equal(time.Second * time.Duration(config.PingInterval))) - Expect(pingerCfg.MaxPacketLossCount).To(Equal(config.MaxPacketLossCount)) - - p, ok := t.pingerMap[pingerCfg.IP] - Expect(ok).To(BeTrue()) - - return p - } t.handler = healthchecker.New(config, t.client, "v1", localNodeName) t.Start(t.handler) diff --git a/pkg/routeagent_driver/main.go b/pkg/routeagent_driver/main.go index 374e6f790..e34b46814 100644 --- a/pkg/routeagent_driver/main.go +++ b/pkg/routeagent_driver/main.go @@ -43,6 +43,7 @@ import ( "github.com/submariner-io/submariner/pkg/event/controller" "github.com/submariner-io/submariner/pkg/node" pfconfigure "github.com/submariner-io/submariner/pkg/packetfilter/configure" + "github.com/submariner-io/submariner/pkg/pinger" "github.com/submariner-io/submariner/pkg/routeagent_driver/cabledriver" "github.com/submariner-io/submariner/pkg/routeagent_driver/environment" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/calico" @@ -133,13 +134,6 @@ func main() { localNode, err := node.GetLocalNode(ctx, k8sClientSet) logger.FatalOnError(err, "Error getting information on the local node") - healthcheckerConfig := &healthchecker.Config{ - PingInterval: submSpec.HealthCheckInterval * 60, - MaxPacketLossCount: submSpec.HealthCheckMaxPacketLossCount, - HealthCheckerEnabled: submSpec.HealthCheckEnabled, - RouteAgentUpdateInterval: 60 * time.Second, - } - registry, err := event.NewRegistry(ctx, "routeagent_driver", np, kubeproxy.NewSyncHandler(env.ClusterCidr, env.ServiceCidr), ovn.NewHandler(&ovn.HandlerConfig{ @@ -158,8 +152,15 @@ func main() { cabledriver.NewVXLANCleanup(), mtu.NewMTUHandler(env.ClusterCidr, len(env.GlobalCidr) != 0, getTCPMssValue(localNode)), calico.NewCalicoIPPoolHandler(cfg, env.Namespace, k8sClientSet), - healthchecker.New(healthcheckerConfig, - smClientset.SubmarinerV1().RouteAgents(submSpec.Namespace), versions.Submariner(), localNode.Name)) + healthchecker.New(&healthchecker.Config{ + ControllerConfig: pinger.ControllerConfig{ + SupportedIPFamilies: submSpec.GetIPFamilies(), + PingInterval: submSpec.HealthCheckInterval * 60, + MaxPacketLossCount: submSpec.HealthCheckMaxPacketLossCount, + }, + HealthCheckerEnabled: submSpec.HealthCheckEnabled, + RouteAgentUpdateInterval: 60 * time.Second, + }, smClientset.SubmarinerV1().RouteAgents(submSpec.Namespace), versions.Submariner(), localNode.Name)) logger.FatalOnError(err, "Error registering the handlers") From 40d50ce0f37d003b2ec0f1737cf38f7e7acc3892 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 2 Apr 2025 11:25:57 -0400 Subject: [PATCH 4/4] Enhance route agent health checker to verify IPv6 connectivity Separate RemoteEndpoint entries are added for IPv4 and IPv6. Signed-off-by: Tom Pantelis --- .../handlers/healthchecker/healthchecker.go | 113 +++++++++------ .../healthchecker/healthchecker_test.go | 133 +++++++++++++++--- 2 files changed, 182 insertions(+), 64 deletions(-) diff --git a/pkg/routeagent_driver/handlers/healthchecker/healthchecker.go b/pkg/routeagent_driver/handlers/healthchecker/healthchecker.go index 1fd5965e8..f13fc9a70 100644 --- a/pkg/routeagent_driver/handlers/healthchecker/healthchecker.go +++ b/pkg/routeagent_driver/handlers/healthchecker/healthchecker.go @@ -21,6 +21,7 @@ package healthchecker import ( "context" "fmt" + "slices" "sync" "time" @@ -164,58 +165,82 @@ func (h *controller) GetName() string { return "routeAgent-health-checker" } -func (h *controller) syncRouteAgentStatus() { - routeAgent := h.generateRouteAgentObject() - remoteEndpoints := h.State().GetRemoteEndpoints() - - for i := range remoteEndpoints { - var connectionStatus submarinerv1.ConnectionStatus - var remoteEndpoint submarinerv1.RemoteEndpoint - var statusMessage string - var latencyRTT *submarinerv1.LatencyRTTSpec - - if !h.config.HealthCheckerEnabled { - connectionStatus = submarinerv1.ConnectionNone - statusMessage = "Health check is not enabled" - } else if h.State().IsOnGateway() { - connectionStatus = submarinerv1.ConnectionNone - statusMessage = "Health check is not performed on gateway nodes" - } else if pingerObject := h.pingerController.Get(&remoteEndpoints[i].Spec, k8snet.IPv4); pingerObject != nil { - latencyInfo := pingerObject.GetLatencyInfo() - if latencyInfo != nil { - switch latencyInfo.ConnectionStatus { - case pinger.Connected: - connectionStatus = submarinerv1.Connected - statusMessage = "" - latencyRTT = &submarinerv1.LatencyRTTSpec{ - Last: latencyInfo.Spec.Last, - Min: latencyInfo.Spec.Min, - Average: latencyInfo.Spec.Average, - Max: latencyInfo.Spec.Max, - StdDev: latencyInfo.Spec.StdDev, - } - case pinger.ConnectionError, pinger.ConnectionUnknown: - connectionStatus = submarinerv1.ConnectionError - statusMessage = latencyInfo.ConnectionError - } - } else { - connectionStatus = submarinerv1.Connecting +func (h *controller) createRemoteEndpoint(endpointSpec *submarinerv1.EndpointSpec, family k8snet.IPFamily) submarinerv1.RemoteEndpoint { + var ( + connectionStatus submarinerv1.ConnectionStatus + statusMessage string + latencyRTT *submarinerv1.LatencyRTTSpec + ) + + if !h.config.HealthCheckerEnabled { + connectionStatus = submarinerv1.ConnectionNone + statusMessage = "Health check is not enabled" + } else if h.State().IsOnGateway() { + connectionStatus = submarinerv1.ConnectionNone + statusMessage = "Health check is not performed on gateway nodes" + } else if pingerObject := h.pingerController.Get(endpointSpec, family); pingerObject != nil { + latencyInfo := pingerObject.GetLatencyInfo() + if latencyInfo != nil { + switch latencyInfo.ConnectionStatus { + case pinger.Connected: + connectionStatus = submarinerv1.Connected statusMessage = "" + latencyRTT = &submarinerv1.LatencyRTTSpec{ + Last: latencyInfo.Spec.Last, + Min: latencyInfo.Spec.Min, + Average: latencyInfo.Spec.Average, + Max: latencyInfo.Spec.Max, + StdDev: latencyInfo.Spec.StdDev, + } + case pinger.ConnectionError, pinger.ConnectionUnknown: + connectionStatus = submarinerv1.ConnectionError + statusMessage = latencyInfo.ConnectionError } } else { - connectionStatus = submarinerv1.ConnectionNone - statusMessage = "Health checker IP is not configured" + connectionStatus = submarinerv1.Connecting + statusMessage = "" } + } else { + connectionStatus = submarinerv1.ConnectionNone + statusMessage = fmt.Sprintf("IPv%s health check IP is not configured", family) + } + + remoteEndpoint := submarinerv1.RemoteEndpoint{ + Status: connectionStatus, + StatusMessage: statusMessage, + Spec: *endpointSpec, + LatencyRTT: latencyRTT, + } - remoteEndpoint = submarinerv1.RemoteEndpoint{ - Status: connectionStatus, - StatusMessage: statusMessage, - Spec: remoteEndpoints[i].Spec, - LatencyRTT: latencyRTT, + toPluralIPs := func(get func(family k8snet.IPFamily) string) []string { + ip := get(family) + if ip != "" { + return []string{ip} } - routeAgent.Status.RemoteEndpoints = append(routeAgent.Status.RemoteEndpoints, remoteEndpoint) + return nil + } + + remoteEndpoint.Spec.HealthCheckIPs = toPluralIPs(endpointSpec.GetHealthCheckIP) + remoteEndpoint.Spec.PublicIPs = toPluralIPs(endpointSpec.GetPublicIP) + remoteEndpoint.Spec.PrivateIPs = toPluralIPs(endpointSpec.GetPrivateIP) + + return remoteEndpoint +} + +func (h *controller) syncRouteAgentStatus() { + routeAgent := h.generateRouteAgentObject() + remoteEndpoints := h.State().GetRemoteEndpoints() + + for i := range remoteEndpoints { + for _, family := range h.config.SupportedIPFamilies { + if slices.Contains(remoteEndpoints[i].Spec.GetIPFamilies(), family) { + routeAgent.Status.RemoteEndpoints = append(routeAgent.Status.RemoteEndpoints, + h.createRemoteEndpoint(&remoteEndpoints[i].Spec, family)) + } + } } + // Use CreateOrUpdate to handle the RouteAgent resource _, err := util.CreateOrUpdate(context.TODO(), h.routeAgentResourceInterface(), routeAgent, func(existing *submarinerv1.RouteAgent) (*submarinerv1.RouteAgent, error) { diff --git a/pkg/routeagent_driver/handlers/healthchecker/healthchecker_test.go b/pkg/routeagent_driver/handlers/healthchecker/healthchecker_test.go index 17134e145..a9bd6f94d 100644 --- a/pkg/routeagent_driver/handlers/healthchecker/healthchecker_test.go +++ b/pkg/routeagent_driver/handlers/healthchecker/healthchecker_test.go @@ -33,6 +33,7 @@ import ( "github.com/submariner-io/submariner/pkg/pinger" "github.com/submariner-io/submariner/pkg/pinger/fake" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/healthchecker" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/dynamic" dynamicfake "k8s.io/client-go/dynamic/fake" @@ -121,7 +122,7 @@ var _ = Describe("RemoteEndpoint latency info", func() { t.CreateEndpoint(t.newSubmEndpoint(healthCheckIP1)) t.pingerMap[healthCheckIP1].AwaitStart() - latencyInfo := t.newLatencyInfo() + latencyInfo := t.newLatencyInfo(k8snet.IPv4) t.setLatencyInfo(healthCheckIP1, latencyInfo) t.awaitRemoteEndpoint(func(ep *submarinerv1.RemoteEndpoint, g Gomega) { @@ -132,12 +133,14 @@ var _ = Describe("RemoteEndpoint latency info", func() { Context("with no HealthCheckIP", func() { It("should not start a pinger and should set the RemoteEndpoint Status to None", func() { - endpoint1 := t.CreateEndpoint(t.newSubmEndpoint("")) + endpoint := t.newSubmEndpoint() + endpoint.Spec.Subnets = []string{"2.2.2.2/24"} + t.CreateEndpoint(endpoint) t.pingerMap[healthCheckIP1].AwaitNoStart() t.awaitRemoteEndpoint(func(ep *submarinerv1.RemoteEndpoint, g Gomega) { g.Expect(ep.Status).To(Equal(submarinerv1.ConnectionNone)) - g.Expect(ep.Spec).To(Equal(endpoint1.Spec)) + g.Expect(ep.Spec).To(Equal(endpoint.Spec)) }) }) }) @@ -196,6 +199,74 @@ var _ = Describe("RemoteEndpoint latency info", func() { }) }) + When("a remote Endpoint with dual-stack health check IPs is created/deleted", func() { + const healthCheckIPv6 = "2001:db8:3333:4444:5555:6666:7777:8888" + + BeforeEach(func() { + t.supportedIPFamilies = []k8snet.IPFamily{k8snet.IPv4, k8snet.IPv6} + t.pingerMap[healthCheckIPv6] = fake.NewPinger(healthCheckIPv6) + }) + + It("should start/stop Pingers and return the correct LatencyInfo for both", func() { + endpoint := t.newSubmEndpoint(healthCheckIP1, healthCheckIPv6) + endpoint.Spec.PublicIPs = []string{"2002:0:0:1234::", "2.2.2.2"} + endpoint.Spec.PrivateIPs = []string{"2003:0:0:1234::", "3.3.3.3"} + + t.CreateEndpoint(endpoint) + t.pingerMap[healthCheckIP1].AwaitStart() + t.pingerMap[healthCheckIPv6].AwaitStart() + + ipv4LatencyInfo := t.newLatencyInfo(k8snet.IPv4) + t.setLatencyInfo(healthCheckIP1, ipv4LatencyInfo) + + ipv6LatencyInfo := t.newLatencyInfo(k8snet.IPv6) + t.setLatencyInfo(healthCheckIPv6, ipv6LatencyInfo) + + t.awaitRouteAgent(func(ra *submarinerv1.RouteAgent, g Gomega) { + epMap := map[string]*submarinerv1.RemoteEndpoint{} + for i := range ra.Status.RemoteEndpoints { + g.Expect(ra.Status.RemoteEndpoints[i].Spec.HealthCheckIPs).To(HaveLen(1)) + epMap[ra.Status.RemoteEndpoints[i].Spec.HealthCheckIPs[0]] = &ra.Status.RemoteEndpoints[i] + } + + ipv4Endpoint := epMap[healthCheckIP1] + g.Expect(ipv4Endpoint).ToNot(BeNil(), "RemoteEndpoint not found for IPv4 health check IP %q", healthCheckIP1) + g.Expect(ipv4Endpoint.Status).To(Equal(submarinerv1.Connected)) + g.Expect(ipv4Endpoint.LatencyRTT).To(Equal(ipv4LatencyInfo.Spec)) + + spec := endpoint.Spec + spec.HealthCheckIPs = []string{healthCheckIP1} + spec.PublicIPs = []string{endpoint.Spec.GetPublicIP(k8snet.IPv4)} + spec.PrivateIPs = []string{endpoint.Spec.GetPrivateIP(k8snet.IPv4)} + g.Expect(ipv4Endpoint.Spec).To(Equal(spec)) + + ipv6Endpoint := epMap[healthCheckIPv6] + g.Expect(ipv6Endpoint).ToNot(BeNil(), "RemoteEndpoint not found for IPv6 health check IP %q", healthCheckIP1) + g.Expect(ipv6Endpoint.Status).To(Equal(submarinerv1.Connected)) + g.Expect(ipv6Endpoint.LatencyRTT).To(Equal(ipv6LatencyInfo.Spec)) + + spec = endpoint.Spec + spec.HealthCheckIPs = []string{healthCheckIPv6} + spec.PublicIPs = []string{endpoint.Spec.GetPublicIP(k8snet.IPv6)} + spec.PrivateIPs = []string{endpoint.Spec.GetPrivateIP(k8snet.IPv6)} + g.Expect(ipv6Endpoint.Spec).To(Equal(spec)) + + g.Expect(ra.Status.RemoteEndpoints).To(HaveLen(2)) + }) + + By("Deleting Endpoint") + + t.DeleteEndpoint(endpoint.Name) + + t.pingerMap[healthCheckIP1].AwaitStop() + t.pingerMap[healthCheckIPv6].AwaitStop() + + t.awaitRouteAgent(func(ra *submarinerv1.RouteAgent, g Gomega) { + g.Expect(ra.Status.RemoteEndpoints).To(BeEmpty()) + }) + }) + }) + When("a pinger reports a connection error", func() { It(" should set the RemoteEndpoint Status to Error", func() { t.CreateEndpoint(t.newSubmEndpoint(healthCheckIP1)) @@ -240,13 +311,35 @@ var _ = Describe("Gateway transition", func() { }) }) +var _ = Describe("Stop", func() { + t := newTestDriver() + + It("should stop the Pingers and delete the RouteAgent resource", func() { + t.CreateEndpoint(t.newSubmEndpoint(healthCheckIP1)) + t.pingerMap[healthCheckIP1].AwaitStart() + + t.awaitRouteAgent(nil) + + Expect(t.handler.Stop()).To(Succeed()) + + t.pingerMap[healthCheckIP1].AwaitStop() + + Eventually(func(g Gomega) { + _, err := t.client.Get(context.TODO(), localNodeName, metav1.GetOptions{}) + g.Expect(apierrors.IsNotFound(err)).To(BeTrue()) + }).Within(5 * time.Second).Should(Succeed()) + + Expect(t.handler.Stop()).To(Succeed()) + }) +}) + type testDriver struct { *eventtesting.ControllerSupport + supportedIPFamilies []k8snet.IPFamily pingerMap map[string]*fake.Pinger handler event.Handler endpoints dynamic.ResourceInterface client submarinerv1client.RouteAgentInterface - stopCh chan struct{} healthcheckerEnabled bool } @@ -256,7 +349,7 @@ func newTestDriver() *testDriver { } BeforeEach(func() { - t.stopCh = make(chan struct{}) + t.supportedIPFamilies = []k8snet.IPFamily{k8snet.IPv4} t.healthcheckerEnabled = true clientset := fakeClient.NewSimpleClientset() @@ -274,7 +367,7 @@ func newTestDriver() *testDriver { JustBeforeEach(func() { config := &healthchecker.Config{ ControllerConfig: pinger.ControllerConfig{ - SupportedIPFamilies: []k8snet.IPFamily{k8snet.IPv4}, + SupportedIPFamilies: t.supportedIPFamilies, PingInterval: 1, // Set interval to 1 second for faster testing MaxPacketLossCount: 1, NewPinger: func(pingerCfg pinger.Config) pinger.Interface { @@ -294,19 +387,19 @@ func newTestDriver() *testDriver { t.Start(t.handler) }) - AfterEach(func() { - close(t.stopCh) - }) - return t } -func (t *testDriver) newSubmEndpoint(healthCheckIP string) *submarinerv1.Endpoint { +func (t *testDriver) newSubmEndpoint(healthCheckIPs ...string) *submarinerv1.Endpoint { endpointSpec := &submarinerv1.EndpointSpec{ - ClusterID: remoteClusterID, - CableName: fmt.Sprintf("submariner-cable-%s-192-68-1-20", remoteClusterID), + ClusterID: remoteClusterID, + CableName: fmt.Sprintf("submariner-cable-%s-192-68-1-20", remoteClusterID), + HealthCheckIPs: healthCheckIPs, + } + + for _, ip := range healthCheckIPs { + endpointSpec.Subnets = append(endpointSpec.Subnets, ip+"/24") } - endpointSpec.HealthCheckIPs = []string{healthCheckIP} endpointName, err := endpointSpec.GenerateName() Expect(err).To(Succeed()) @@ -322,15 +415,15 @@ func (t *testDriver) newSubmEndpoint(healthCheckIP string) *submarinerv1.Endpoin return endpoint } -func (t *testDriver) newLatencyInfo() *pinger.LatencyInfo { +func (t *testDriver) newLatencyInfo(family k8snet.IPFamily) *pinger.LatencyInfo { return &pinger.LatencyInfo{ ConnectionStatus: pinger.Connected, Spec: &submarinerv1.LatencyRTTSpec{ - Last: "82ms", - Min: "80ms", - Average: "85ms", - Max: "89ms", - StdDev: "5ms", + Last: string(family) + "82ms", + Min: string(family) + "80ms", + Average: string(family) + "85ms", + Max: string(family) + "89ms", + StdDev: string(family) + "5ms", }, } }