Skip to content

Commit 3b08839

Browse files
authored
Reconcile SecondaryNetwork OVS ports after Agent restart (#6853)
Restore state of SecondaryNetwork controller on restart, based on what is available in OVSDB. Using the "primary" interface store as a source of truth, we delete stale OVS ports if a Pod no longer exists after a restart. Fixes #6578 Fixes #5623 Signed-off-by: KMAnju-2021 <[email protected]>
1 parent d64f559 commit 3b08839

24 files changed

+562
-170
lines changed

cmd/antrea-agent/agent.go

+13-12
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,19 @@ func run(o *Options) error {
649649
}
650650
}
651651

652+
// Secondary network controller should be created before CNIServer.Run() to make sure no Pod CNI updates will be missed.
653+
var secondaryNetworkController *secondarynetwork.Controller
654+
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
655+
secondaryNetworkController, err = secondarynetwork.NewController(
656+
o.config.ClientConnection, o.config.KubeAPIServerOverride,
657+
k8sClient, localPodInformer.Get(),
658+
podUpdateChannel, ifaceStore,
659+
&o.config.SecondaryNetwork, ovsdbConnection)
660+
if err != nil {
661+
return fmt.Errorf("failed to create secondary network controller: %w", err)
662+
}
663+
}
664+
652665
var traceflowController *traceflow.Controller
653666
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
654667
traceflowController = traceflow.NewTraceflowController(
@@ -763,18 +776,6 @@ func run(o *Options) error {
763776
go ipamController.Run(stopCh)
764777
}
765778

766-
var secondaryNetworkController *secondarynetwork.Controller
767-
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
768-
secondaryNetworkController, err = secondarynetwork.NewController(
769-
o.config.ClientConnection, o.config.KubeAPIServerOverride,
770-
k8sClient, localPodInformer.Get(),
771-
podUpdateChannel,
772-
&o.config.SecondaryNetwork, ovsdbConnection)
773-
if err != nil {
774-
return fmt.Errorf("failed to create secondary network controller: %w", err)
775-
}
776-
}
777-
778779
var bgpController *bgp.Controller
779780
if features.DefaultFeatureGate.Enabled(features.BGPPolicy) {
780781
bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies()

pkg/agent/secondarynetwork/init.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
componentbaseconfig "k8s.io/component-base/config"
2525
"k8s.io/klog/v2"
2626

27+
"antrea.io/antrea/pkg/agent/interfacestore"
2728
"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
2829
agentconfig "antrea.io/antrea/pkg/config/agent"
2930
"antrea.io/antrea/pkg/ovs/ovsconfig"
@@ -47,6 +48,7 @@ func NewController(
4748
k8sClient clientset.Interface,
4849
podInformer cache.SharedIndexInformer,
4950
podUpdateSubscriber channel.Subscriber,
51+
primaryInterfaceStore interfacestore.InterfaceStore,
5052
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB,
5153
) (*Controller, error) {
5254
ovsBridgeClient, err := createOVSBridge(secNetConfig.OVSBridges, ovsdb)
@@ -65,7 +67,7 @@ func NewController(
6567
// k8s.v1.cni.cncf.io/networks Annotation defined.
6668
podWatchController, err := podwatch.NewPodController(
6769
k8sClient, netAttachDefClient, podInformer,
68-
podUpdateSubscriber, ovsBridgeClient)
70+
podUpdateSubscriber, primaryInterfaceStore, ovsBridgeClient)
6971
if err != nil {
7072
return nil, err
7173
}

pkg/agent/secondarynetwork/podwatch/controller.go

+85
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func NewPodController(
103103
netAttachDefClient netdefclient.K8sCniCncfIoV1Interface,
104104
podInformer cache.SharedIndexInformer,
105105
podUpdateSubscriber channel.Subscriber,
106+
primaryInterfaceStore interfacestore.InterfaceStore,
106107
ovsBridgeClient ovsconfig.OVSBridgeClient,
107108
) (*PodController, error) {
108109
ifaceStore := interfacestore.NewInterfaceStore()
@@ -134,6 +135,19 @@ func NewPodController(
134135
},
135136
resyncPeriod,
136137
)
138+
139+
// This is the case when secondary bridge is not configured and no VLAN interfaces at all. In this case,
140+
// we should skip both initializeSecondaryInterfaceStore and reconcileSecondaryInterfaces.
141+
if ovsBridgeClient != nil {
142+
if err := pc.initializeSecondaryInterfaceStore(); err != nil {
143+
return nil, fmt.Errorf("failed to initialize secondary interface store: %w", err)
144+
}
145+
146+
if err := pc.reconcileSecondaryInterfaces(primaryInterfaceStore); err != nil {
147+
return nil, fmt.Errorf("failed to restore CNI cache and reconcile secondary interfaces: %w", err)
148+
}
149+
}
150+
137151
// podUpdateSubscriber can be nil with test code.
138152
if podUpdateSubscriber != nil {
139153
// Subscribe Pod CNI add/del events.
@@ -521,3 +535,74 @@ func checkForPodSecondaryNetworkAttachement(pod *corev1.Pod) (string, bool) {
521535
return netObj, false
522536
}
523537
}
538+
539+
// initializeSecondaryInterfaceStore restores secondary interfaceStore when agent restarts.
540+
func (pc *PodController) initializeSecondaryInterfaceStore() error {
541+
ovsPorts, err := pc.ovsBridgeClient.GetPortList()
542+
if err != nil {
543+
return fmt.Errorf("failed to list OVS ports for the secondary bridge: %w", err)
544+
}
545+
546+
ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
547+
for index := range ovsPorts {
548+
port := &ovsPorts[index]
549+
ovsPort := &interfacestore.OVSPortConfig{
550+
PortUUID: port.UUID,
551+
OFPort: port.OFPort,
552+
}
553+
554+
interfaceType, ok := port.ExternalIDs[interfacestore.AntreaInterfaceTypeKey]
555+
if !ok {
556+
klog.InfoS("Interface type is not set for the secondary bridge", "interfaceName", port.Name)
557+
continue
558+
}
559+
560+
var intf *interfacestore.InterfaceConfig
561+
switch interfaceType {
562+
case interfacestore.AntreaContainer:
563+
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort)
564+
default:
565+
klog.InfoS("Unknown Antrea interface type for the secondary bridge", "type", interfaceType)
566+
continue
567+
}
568+
569+
ifaceList = append(ifaceList, intf)
570+
}
571+
572+
pc.interfaceStore.Initialize(ifaceList)
573+
klog.InfoS("Successfully initialized the secondary bridge interface store")
574+
575+
return nil
576+
}
577+
578+
// reconcileSecondaryInterfaces restores cniCache when agent restarts using primary interfaceStore.
579+
func (pc *PodController) reconcileSecondaryInterfaces(primaryInterfaceStore interfacestore.InterfaceStore) error {
580+
knownInterfaces := primaryInterfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
581+
for _, containerConfig := range knownInterfaces {
582+
config := containerConfig.ContainerInterfaceConfig
583+
podKey := podKeyGet(config.PodName, config.PodNamespace)
584+
pc.cniCache.Store(podKey, &podCNIInfo{
585+
containerID: config.ContainerID,
586+
})
587+
}
588+
589+
var staleInterfaces []*interfacestore.InterfaceConfig
590+
// secondaryInterfaces is the list of interfaces currently in the secondary local cache.
591+
secondaryInterfaces := pc.interfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
592+
for _, containerConfig := range secondaryInterfaces {
593+
_, exists := primaryInterfaceStore.GetContainerInterface(containerConfig.ContainerID)
594+
if !exists || containerConfig.OFPort == -1 {
595+
// Deletes ports not in the CNI cache.
596+
staleInterfaces = append(staleInterfaces, containerConfig)
597+
}
598+
}
599+
600+
// If there are any stale interfaces, pass them to removeInterfaces()
601+
if len(staleInterfaces) > 0 {
602+
if err := pc.removeInterfaces(staleInterfaces); err != nil {
603+
klog.ErrorS(err, "Failed to remove stale secondary interfaces", "staleInterfaces", staleInterfaces)
604+
}
605+
}
606+
607+
return nil
608+
}

pkg/agent/secondarynetwork/podwatch/controller_test.go

+135-5
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"time"
3333

3434
current "github.com/containernetworking/cni/pkg/types/100"
35+
"github.com/google/uuid"
3536
netdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
3637
netdefclientfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake"
3738
"github.com/stretchr/testify/assert"
@@ -43,12 +44,15 @@ import (
4344
"k8s.io/client-go/kubernetes/fake"
4445
"k8s.io/client-go/util/workqueue"
4546

47+
"antrea.io/antrea/pkg/agent/cniserver"
4648
"antrea.io/antrea/pkg/agent/cniserver/ipam"
4749
cnitypes "antrea.io/antrea/pkg/agent/cniserver/types"
4850
"antrea.io/antrea/pkg/agent/interfacestore"
4951
podwatchtesting "antrea.io/antrea/pkg/agent/secondarynetwork/podwatch/testing"
5052
"antrea.io/antrea/pkg/agent/types"
5153
crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
54+
"antrea.io/antrea/pkg/ovs/ovsconfig"
55+
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
5256
)
5357

5458
const (
@@ -213,14 +217,17 @@ func TestPodControllerRun(t *testing.T) {
213217
ctrl := gomock.NewController(t)
214218
client := fake.NewSimpleClientset()
215219
netdefclient := netdefclientfake.NewSimpleClientset().K8sCniCncfIoV1()
220+
mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl)
221+
mockOVSBridgeClient.EXPECT().GetPortList().Return(nil, nil).AnyTimes()
222+
primaryInterfaceStore := interfacestore.NewInterfaceStore()
216223
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
217224
interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl)
218225
mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl)
219226
podController, _ := NewPodController(
220227
client,
221228
netdefclient,
222229
informerFactory.Core().V1().Pods().Informer(),
223-
nil, nil)
230+
nil, primaryInterfaceStore, mockOVSBridgeClient)
224231
podController.interfaceConfigurator = interfaceConfigurator
225232
podController.ipamAllocator = mockIPAM
226233
cniCache := &podController.cniCache
@@ -968,7 +975,7 @@ func TestPodControllerAddPod(t *testing.T) {
968975

969976
t.Run("updating deviceID cache per Pod", func(t *testing.T) {
970977
ctrl := gomock.NewController(t)
971-
podController, _, _ := testPodController(ctrl)
978+
podController, _, _, _ := testPodController(ctrl)
972979
_, err := podController.assignUnusedSriovVFDeviceID(podName, testNamespace, sriovResourceName1, interfaceName)
973980
_, exists := podController.vfDeviceIDUsageMap.Load(podKey)
974981
assert.True(t, exists)
@@ -984,16 +991,18 @@ func TestPodControllerAddPod(t *testing.T) {
984991

985992
func testPodController(ctrl *gomock.Controller) (
986993
*PodController, *podwatchtesting.MockIPAMAllocator,
987-
*podwatchtesting.MockInterfaceConfigurator) {
994+
*podwatchtesting.MockInterfaceConfigurator, *ovsconfigtest.MockOVSBridgeClient) {
988995
client := fake.NewSimpleClientset()
989996
netdefclient := netdefclientfake.NewSimpleClientset().K8sCniCncfIoV1()
990997
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
991998
interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl)
992999
mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl)
1000+
mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl)
9931001

9941002
// PodController without event handlers.
9951003
return &PodController{
9961004
kubeClient: client,
1005+
ovsBridgeClient: mockOVSBridgeClient,
9971006
netAttachDefClient: netdefclient,
9981007
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
9991008
workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay),
@@ -1005,18 +1014,139 @@ func testPodController(ctrl *gomock.Controller) (
10051014
interfaceConfigurator: interfaceConfigurator,
10061015
ipamAllocator: mockIPAM,
10071016
interfaceStore: interfacestore.NewInterfaceStore(),
1008-
}, mockIPAM, interfaceConfigurator
1017+
}, mockIPAM, interfaceConfigurator, mockOVSBridgeClient
10091018
}
10101019

10111020
// Create a test PodController and start informerFactory.
10121021
func testPodControllerStart(ctrl *gomock.Controller) (
10131022
*PodController, *podwatchtesting.MockIPAMAllocator,
10141023
*podwatchtesting.MockInterfaceConfigurator) {
1015-
podController, mockIPAM, interfaceConfigurator := testPodController(ctrl)
1024+
podController, mockIPAM, interfaceConfigurator, _ := testPodController(ctrl)
10161025
informerFactory := informers.NewSharedInformerFactory(podController.kubeClient, resyncPeriod)
10171026
podController.podInformer = informerFactory.Core().V1().Pods().Informer()
10181027
stopCh := make(chan struct{})
10191028
informerFactory.Start(stopCh)
10201029
informerFactory.WaitForCacheSync(stopCh)
10211030
return podController, mockIPAM, interfaceConfigurator
10221031
}
1032+
1033+
func convertExternalIDMap(in map[string]interface{}) map[string]string {
1034+
out := make(map[string]string, len(in))
1035+
for k, v := range in {
1036+
out[k] = v.(string)
1037+
}
1038+
return out
1039+
}
1040+
1041+
func createTestInterfaces() (map[string]string, []ovsconfig.OVSPortData, []*interfacestore.InterfaceConfig) {
1042+
uuid1 := uuid.New().String()
1043+
uuid2 := uuid.New().String()
1044+
uuid3 := uuid.New().String()
1045+
uuid4 := uuid.New().String()
1046+
1047+
p1MAC, p1IP := "11:22:33:44:55:66", "192.168.1.10"
1048+
p2MAC, p2IP := "11:22:33:44:55:77", "192.168.1.11"
1049+
1050+
p1NetMAC, _ := net.ParseMAC(p1MAC)
1051+
p1NetIP := net.ParseIP(p1IP)
1052+
p2NetMAC, _ := net.ParseMAC(p2MAC)
1053+
p2NetIP := net.ParseIP(p2IP)
1054+
1055+
// Create InterfaceConfig objects directly
1056+
containerConfig1 := interfacestore.NewContainerInterface("p1", uuid1, "Pod1", "nsA", "eth0", p1NetMAC, []net.IP{p1NetIP}, 100)
1057+
containerConfig1.OVSPortConfig = &interfacestore.OVSPortConfig{
1058+
OFPort: 11,
1059+
}
1060+
containerConfig2 := interfacestore.NewContainerInterface("p2", uuid2, "Pod2", "nsA", "eth0", p2NetMAC, []net.IP{p2NetIP}, 100)
1061+
containerConfig2.OVSPortConfig = &interfacestore.OVSPortConfig{
1062+
OFPort: 12,
1063+
}
1064+
containerConfig3 := interfacestore.NewContainerInterface("p3", uuid3, "Pod3", "nsA", "eth0", p2NetMAC, []net.IP{p2NetIP}, 100)
1065+
containerConfig3.OVSPortConfig = &interfacestore.OVSPortConfig{
1066+
OFPort: -1,
1067+
}
1068+
1069+
ovsPort1 := ovsconfig.OVSPortData{
1070+
UUID: uuid1, Name: "p1", OFPort: 11,
1071+
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
1072+
containerConfig1))}
1073+
1074+
ovsPort2 := ovsconfig.OVSPortData{
1075+
UUID: uuid2, Name: "p2", OFPort: 12,
1076+
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
1077+
containerConfig2))}
1078+
1079+
ovsPort3 := ovsconfig.OVSPortData{
1080+
UUID: uuid3, Name: "p3", OFPort: -1,
1081+
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
1082+
containerConfig3))}
1083+
1084+
ovsPort4 := ovsconfig.OVSPortData{
1085+
UUID: uuid4,
1086+
Name: "unknownIface",
1087+
OFPort: 20,
1088+
ExternalIDs: map[string]string{
1089+
"unknownKey": "unknownValue"}}
1090+
1091+
return map[string]string{"uuid1": uuid1, "uuid2": uuid2, "uuid3": uuid3, "uuid4": uuid4}, []ovsconfig.OVSPortData{ovsPort1, ovsPort2, ovsPort3, ovsPort4}, []*interfacestore.InterfaceConfig{containerConfig1, containerConfig2, containerConfig3}
1092+
}
1093+
1094+
func TestInitializeSecondaryInterfaceStore(t *testing.T) {
1095+
ctrl := gomock.NewController(t)
1096+
pc, _, _, mockOVSBridgeClient := testPodController(ctrl)
1097+
uuids, ovsPorts, _ := createTestInterfaces()
1098+
mockOVSBridgeClient.EXPECT().GetPortList().Return(ovsPorts, nil)
1099+
1100+
err := pc.initializeSecondaryInterfaceStore()
1101+
require.NoError(t, err, "OVS ports list successfully")
1102+
1103+
// Validate stored interfaces
1104+
require.Equal(t, 3, pc.interfaceStore.Len(), "Only valid interfaces should be stored")
1105+
1106+
_, found1 := pc.interfaceStore.GetContainerInterface(uuids["uuid1"])
1107+
assert.True(t, found1, "Interface 1 should be stored")
1108+
1109+
_, found2 := pc.interfaceStore.GetContainerInterface(uuids["uuid2"])
1110+
assert.True(t, found2, "Interface 2 should be stored")
1111+
1112+
_, found3 := pc.interfaceStore.GetContainerInterface(uuids["uuid3"])
1113+
assert.True(t, found3, "Interface 3 should be stored")
1114+
1115+
_, found4 := pc.interfaceStore.GetContainerInterface(uuids["uuid4"])
1116+
assert.False(t, found4, "Unknown interface type should not be stored")
1117+
}
1118+
1119+
func TestReconcileSecondaryInterfaces(t *testing.T) {
1120+
ctrl := gomock.NewController(t)
1121+
pc, mockIPAM, interfaceConfigurator, _ := testPodController(ctrl)
1122+
primaryStore := interfacestore.NewInterfaceStore()
1123+
_, _, containerConfigs := createTestInterfaces()
1124+
1125+
// Add interfaces to primary store
1126+
primaryStore.AddInterface(containerConfigs[0])
1127+
primaryStore.AddInterface(containerConfigs[1])
1128+
1129+
// Add interfaces to controller secondaryInterfaceStore
1130+
pc.interfaceStore.AddInterface(containerConfigs[0])
1131+
pc.interfaceStore.AddInterface(containerConfigs[1])
1132+
// Case when OFPort == -1
1133+
pc.interfaceStore.AddInterface(containerConfigs[2])
1134+
1135+
interfaceConfigurator.EXPECT().DeleteVLANSecondaryInterface(gomock.Any()).Return(nil).Times(1)
1136+
mockIPAM.EXPECT().SecondaryNetworkRelease(gomock.Any()).Return(nil).Times(1)
1137+
1138+
err := pc.reconcileSecondaryInterfaces(primaryStore)
1139+
1140+
require.NoError(t, err)
1141+
1142+
// Check CNI Cache
1143+
_, foundPod1 := pc.cniCache.Load("nsA/Pod1")
1144+
assert.True(t, foundPod1, "CNI Cache should contain nsA/Pod1")
1145+
1146+
_, foundPod2 := pc.cniCache.Load("nsA/Pod2")
1147+
assert.True(t, foundPod2, "CNI Cache should contain nsA/Pod2")
1148+
1149+
// Ensure stale interfaces are removed
1150+
_, foundPod3 := pc.cniCache.Load("nsA/Pod3")
1151+
assert.False(t, foundPod3, "Stale interface should have been removed")
1152+
}

0 commit comments

Comments
 (0)