Skip to content

Enhance route agent health checker to verify IPv6 connectivity #3380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 21 additions & 97 deletions pkg/cableengine/healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ limitations under the License.
package healthchecker

import (
"sync"
"time"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/watcher"
Expand All @@ -39,62 +36,52 @@ type Interface interface {
}

type Config struct {
WatcherConfig *watcher.Config
SupportedIPFamilies []k8snet.IPFamily
EndpointNamespace string
ClusterID string
PingInterval int
MaxPacketLossCount int
NewPinger func(pinger.Config) pinger.Interface
pinger.ControllerConfig
WatcherConfig watcher.Config
EndpointNamespace string
ClusterID string
}

type controller struct {
sync.RWMutex
pingers map[string]pinger.Interface
config *Config
config Config
pingerController pinger.Controller
}

var logger = log.Logger{Logger: logf.Log.WithName("HealthChecker")}

func New(config *Config) (Interface, error) {
if len(config.SupportedIPFamilies) == 0 {
return nil, errors.New("SupportedIPFamilies must not be empty")
}

controller := &controller{
config: config,
pingers: map[string]pinger.Interface{},
c := &controller{
config: *config,
pingerController: pinger.NewController(config.ControllerConfig),
}

config.WatcherConfig.ResourceConfigs = []watcher.ResourceConfig{
c.config.WatcherConfig.ResourceConfigs = []watcher.ResourceConfig{
{
Name: "HealthChecker Endpoint Controller",
ResourceType: &submarinerv1.Endpoint{},
Handler: watcher.EventHandlerFuncs{
OnCreateFunc: controller.endpointCreatedOrUpdated,
OnUpdateFunc: controller.endpointCreatedOrUpdated,
OnDeleteFunc: controller.endpointDeleted,
OnCreateFunc: c.endpointCreatedOrUpdated,
OnUpdateFunc: c.endpointCreatedOrUpdated,
OnDeleteFunc: c.endpointDeleted,
},
SourceNamespace: config.EndpointNamespace,
},
}

return controller, nil
return c, nil
}

func (h *controller) GetLatencyInfo(endpoint *submarinerv1.EndpointSpec, ipFamily k8snet.IPFamily) *pinger.LatencyInfo {
h.RLock()
defer h.RUnlock()

if pingerObject, found := h.pingers[endpoint.GetFamilyCableName(ipFamily)]; found {
return pingerObject.GetLatencyInfo()
pinger := h.pingerController.Get(endpoint, ipFamily)
if pinger != nil {
return pinger.GetLatencyInfo()
}

return nil
}

func (h *controller) Start(stopCh <-chan struct{}) error {
endpointWatcher, err := watcher.New(h.config.WatcherConfig)
endpointWatcher, err := watcher.New(&h.config.WatcherConfig)
if err != nil {
return errors.Wrapf(err, "error creating watcher")
}
Expand All @@ -110,14 +97,7 @@ func (h *controller) Start(stopCh <-chan struct{}) error {
}

func (h *controller) Stop() {
h.Lock()
defer h.Unlock()

for _, p := range h.pingers {
p.Stop()
}

h.pingers = map[string]pinger.Interface{}
h.pingerController.Stop()
}

func (h *controller) endpointCreatedOrUpdated(obj runtime.Object, _ int) bool {
Expand All @@ -128,68 +108,12 @@ func (h *controller) endpointCreatedOrUpdated(obj runtime.Object, _ int) bool {
return false
}

h.Lock()
defer h.Unlock()

for _, family := range h.config.SupportedIPFamilies {
healthCheckIP := endpointCreated.Spec.GetHealthCheckIP(family)
if healthCheckIP == "" {
logger.Infof("IPv%v HealthCheckIP for Endpoint %q is empty - will not monitor endpoint health",
family, endpointCreated.Name)
continue
}

h.startPinger(endpointCreated.Spec.GetFamilyCableName(family), healthCheckIP)
}
h.pingerController.EndpointCreatedOrUpdated(&endpointCreated.Spec)

return false
}

func (h *controller) startPinger(familyCableName, healthCheckIP string) {
if pingerObject, found := h.pingers[familyCableName]; found {
if pingerObject.GetIP() == healthCheckIP {
return
}

logger.V(log.DEBUG).Infof("HealthChecker is already running for %q - stopping", familyCableName)
pingerObject.Stop()
delete(h.pingers, familyCableName)
}

pingerConfig := pinger.Config{
IP: healthCheckIP,
MaxPacketLossCount: h.config.MaxPacketLossCount,
}

if h.config.PingInterval != 0 {
pingerConfig.Interval = time.Second * time.Duration(h.config.PingInterval)
}

newPingerFunc := h.config.NewPinger
if newPingerFunc == nil {
newPingerFunc = pinger.NewPinger
}

pingerObject := newPingerFunc(pingerConfig)
h.pingers[familyCableName] = pingerObject
pingerObject.Start()

logger.Infof("CableEngine HealthChecker started pinger for CableName: %q with HealthCheckIP %q", familyCableName, healthCheckIP)
}

func (h *controller) endpointDeleted(obj runtime.Object, _ int) bool {
endpointDeleted := obj.(*submarinerv1.Endpoint)

h.Lock()
defer h.Unlock()

for _, family := range h.config.SupportedIPFamilies {
familyCableName := endpointDeleted.Spec.GetFamilyCableName(family)
if pingerObject, found := h.pingers[familyCableName]; found {
pingerObject.Stop()
delete(h.pingers, familyCableName)
}
}

h.pingerController.EndpointRemoved(&(obj.(*submarinerv1.Endpoint)).Spec)
return false
}
3 changes: 3 additions & 0 deletions pkg/cableengine/healthchecker/healthchecker_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/log/kzerolog"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
kubeScheme "k8s.io/client-go/kubernetes/scheme"
)

func init() {
Expand All @@ -32,6 +34,7 @@ func init() {

var _ = BeforeSuite(func() {
kzerolog.InitK8sLogging()
Expect(submarinerv1.AddToScheme(kubeScheme.Scheme)).To(Succeed())
})

func TestHealthChecker(t *testing.T) {
Expand Down
107 changes: 32 additions & 75 deletions pkg/cableengine/healthchecker/healthchecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package healthchecker_test
import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/submariner-io/submariner/pkg/pinger"
"github.com/submariner-io/submariner/pkg/pinger/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
fakeClient "k8s.io/client-go/dynamic/fake"
kubeScheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -54,7 +52,6 @@ var _ = Describe("Controller", func() {
endpoints dynamic.ResourceInterface
pingerMap map[string]*fake.Pinger
stopCh chan struct{}
checkInstantiation func(error)
)

BeforeEach(func() {
Expand All @@ -63,50 +60,40 @@ var _ = Describe("Controller", func() {
healthCheckIP1: fake.NewPinger(healthCheckIP1),
healthCheckIP2: fake.NewPinger(healthCheckIP2),
}

checkInstantiation = func(err error) {
Expect(err).To(Succeed())
Expect(healthChecker.Start(stopCh)).To(Succeed())
}
})

JustBeforeEach(func() {
stopCh = make(chan struct{})
scheme := runtime.NewScheme()
Expect(submarinerv1.AddToScheme(scheme)).To(Succeed())
Expect(submarinerv1.AddToScheme(kubeScheme.Scheme)).To(Succeed())

dynamicClient := fakeClient.NewSimpleDynamicClient(scheme)
dynamicClient := fakeClient.NewSimpleDynamicClient(kubeScheme.Scheme)
restMapper := test.GetRESTMapperFor(&submarinerv1.Endpoint{})
endpoints = dynamicClient.Resource(*test.GetGroupVersionResourceFor(restMapper, &submarinerv1.Endpoint{})).Namespace(namespace)

var err error

config := &healthchecker.Config{
WatcherConfig: &watcher.Config{
ControllerConfig: pinger.ControllerConfig{
SupportedIPFamilies: supportedIPFamilies,
PingInterval: 3,
MaxPacketLossCount: 4,
NewPinger: func(pingerCfg pinger.Config) pinger.Interface {
p, ok := pingerMap[pingerCfg.IP]
Expect(ok).To(BeTrue())

return p
},
},
WatcherConfig: watcher.Config{
RestMapper: restMapper,
Client: dynamicClient,
Scheme: scheme,
},
SupportedIPFamilies: supportedIPFamilies,
EndpointNamespace: namespace,
ClusterID: localClusterID,
PingInterval: 3,
MaxPacketLossCount: 4,
}

config.NewPinger = func(pingerCfg pinger.Config) pinger.Interface {
defer GinkgoRecover()
Expect(pingerCfg.Interval).To(Equal(time.Second * time.Duration(config.PingInterval)))
Expect(pingerCfg.MaxPacketLossCount).To(Equal(config.MaxPacketLossCount))

p, ok := pingerMap[pingerCfg.IP]
Expect(ok).To(BeTrue())
return p
EndpointNamespace: namespace,
ClusterID: localClusterID,
}

healthChecker, err = healthchecker.New(config)
checkInstantiation(err)
Expect(err).ToNot(HaveOccurred())
Expect(healthChecker.Start(stopCh)).To(Succeed())
})

AfterEach(func() {
Expand Down Expand Up @@ -280,60 +267,30 @@ var _ = Describe("Controller", func() {
})
})

When("a remote Endpoint is updated", func() {
When("a remote Endpoint is updated and the HealthCheckIP was changed", func() {
var endpoint *submarinerv1.Endpoint

BeforeEach(func() {
pingerMap[healthCheckIP3] = fake.NewPinger(healthCheckIP3)
})

JustBeforeEach(func() {
endpoint = createEndpoint(remoteClusterID1, healthCheckIP1)
pingerMap[healthCheckIP1].AwaitStart()
})

When("the HealthCheckIP was changed", func() {
BeforeEach(func() {
pingerMap[healthCheckIP3] = fake.NewPinger(healthCheckIP3)
})

It("should stop the Pinger and start a new one", func() {
endpoint.Spec.HealthCheckIPs = []string{healthCheckIP3}

test.UpdateResource(endpoints, endpoint)
pingerMap[healthCheckIP1].AwaitStop()
pingerMap[healthCheckIP3].AwaitStart()

latencyInfo := newLatencyInfo()
pingerMap[healthCheckIP3].SetLatencyInfo(latencyInfo)
Eventually(func() *pinger.LatencyInfo {
return healthChecker.GetLatencyInfo(&endpoint.Spec, k8snet.IPv4)
}).Should(Equal(latencyInfo))
})
})

When("the HealthCheckIP did not changed", func() {
It("should not start a new Pinger", func() {
endpoint.Spec.Hostname = "raiders"
It("should stop the Pinger and start a new one", func() {
endpoint.Spec.HealthCheckIPs = []string{healthCheckIP3}

test.UpdateResource(endpoints, endpoint)
pingerMap[healthCheckIP1].AwaitNoStop()
})
})
})

When("a remote Endpoint has no HealthCheckIP", func() {
It("should not start a Pinger", func() {
createEndpoint(remoteClusterID1, "")
pingerMap[healthCheckIP1].AwaitNoStart()
})
})

When("no supported IP families are provided", func() {
BeforeEach(func() {
supportedIPFamilies = nil
checkInstantiation = func(err error) {
Expect(err).To(HaveOccurred())
}
})
test.UpdateResource(endpoints, endpoint)
pingerMap[healthCheckIP1].AwaitStop()
pingerMap[healthCheckIP3].AwaitStart()

Specify("the health checker should fail", func() {
latencyInfo := newLatencyInfo()
pingerMap[healthCheckIP3].SetLatencyInfo(latencyInfo)
Eventually(func() *pinger.LatencyInfo {
return healthChecker.GetLatencyInfo(&endpoint.Spec, k8snet.IPv4)
}).Should(Equal(latencyInfo))
})
})
})
20 changes: 11 additions & 9 deletions pkg/cableengine/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,19 +549,21 @@ func (t *testDriver) run() {
var err error

t.healthChecker, err = healthchecker.New(&healthchecker.Config{
WatcherConfig: &watcher.Config{
ControllerConfig: pinger.ControllerConfig{
SupportedIPFamilies: []k8snet.IPFamily{k8snet.IPv4},
NewPinger: func(pingerCfg pinger.Config) pinger.Interface {
defer GinkgoRecover()
Expect(pingerCfg.IP).To(Equal(t.pinger.GetIP()))
return t.pinger
},
},
WatcherConfig: watcher.Config{
RestMapper: restMapper,
Client: dynamicClient,
Scheme: scheme,
},
SupportedIPFamilies: []k8snet.IPFamily{k8snet.IPv4},
EndpointNamespace: namespace,
ClusterID: t.engine.LocalEndPoint.Spec.ClusterID,
NewPinger: func(pingerCfg pinger.Config) pinger.Interface {
defer GinkgoRecover()
Expect(pingerCfg.IP).To(Equal(t.pinger.GetIP()))
return t.pinger
},
EndpointNamespace: namespace,
ClusterID: t.engine.LocalEndPoint.Spec.ClusterID,
})
Expect(err).To(Succeed())

Expand Down
Loading
Loading