Skip to content

Commit c57b883

Browse files
tpantelisyboaron
authored andcommitted
Handle updates to the node OVN transit switch IP
...used for NonGatewayRoutes. This addresses a TODO. Added a TransitSwitchIP interface and implementation that encapsulates the transit switch IP extracted from the local node's annotation value. The instance is shared between the NonGatewayRouteController and the NonGatewayRouteHandler. The NonGatewayRouteHandler watches for node updates and updates the TransitSwitchIP value and the NonGatewayRoutes NextHops field. The NonGatewayRouteController is a read-only consumer of TransitSwitchIP. Signed-off-by: Tom Pantelis <[email protected]>
1 parent e77f27d commit c57b883

10 files changed

+425
-104
lines changed

pkg/routeagent_driver/handlers/ovn/handler.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,15 @@ import (
4343
type NewOVSDBClientFn func(_ model.ClientDBModel, _ ...libovsdbclient.Option) (libovsdbclient.Client, error)
4444

4545
type HandlerConfig struct {
46-
Namespace string
47-
ClusterCIDR []string
48-
ServiceCIDR []string
49-
SubmClient clientset.Interface
50-
K8sClient kubernetes.Interface
51-
DynClient dynamic.Interface
52-
WatcherConfig *watcher.Config
53-
NewOVSDBClient NewOVSDBClientFn
46+
Namespace string
47+
ClusterCIDR []string
48+
ServiceCIDR []string
49+
SubmClient clientset.Interface
50+
K8sClient kubernetes.Interface
51+
DynClient dynamic.Interface
52+
WatcherConfig *watcher.Config
53+
NewOVSDBClient NewOVSDBClientFn
54+
TransitSwitchIP TransitSwitchIPGetter
5455
}
5556

5657
type Handler struct {
@@ -124,7 +125,7 @@ func (ovn *Handler) Init() error {
124125
return err
125126
}
126127

127-
nonGatewayRouteController, err := NewNonGatewayRouteController(*ovn.WatcherConfig, connectionHandler, ovn.K8sClient, ovn.Namespace)
128+
nonGatewayRouteController, err := NewNonGatewayRouteController(*ovn.WatcherConfig, connectionHandler, ovn.Namespace, ovn.TransitSwitchIP)
128129
if err != nil {
129130
return err
130131
}

pkg/routeagent_driver/handlers/ovn/handler_test.go

+78-19
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ const (
5151
var _ = Describe("Handler", func() {
5252
t := newTestDriver()
5353

54-
var ovsdbClient *fakeovn.OVSDBClient
54+
var (
55+
ovsdbClient *fakeovn.OVSDBClient
56+
transitSwitchIP ovn.TransitSwitchIP
57+
)
5558

5659
BeforeEach(func() {
5760
ovsdbClient = fakeovn.NewOVSDBClient()
@@ -79,6 +82,8 @@ var _ = Describe("Handler", func() {
7982

8083
restMapper := test.GetRESTMapperFor(&submarinerv1.GatewayRoute{}, &submarinerv1.NonGatewayRoute{})
8184

85+
transitSwitchIP = ovn.NewTransitSwitchIP()
86+
8287
t.Start(ovn.NewHandler(&ovn.HandlerConfig{
8388
Namespace: testing.Namespace,
8489
ClusterCIDR: []string{clusterCIDR},
@@ -93,9 +98,11 @@ var _ = Describe("Handler", func() {
9398
NewOVSDBClient: func(_ model.ClientDBModel, _ ...libovsdbclient.Option) (libovsdbclient.Client, error) {
9499
return ovsdbClient, nil
95100
},
101+
TransitSwitchIP: transitSwitchIP,
96102
}))
97103

98104
Expect(ovsdbClient.Connected()).To(BeTrue())
105+
Expect(transitSwitchIP.Init(t.k8sClient)).To(Succeed())
99106
})
100107

101108
When("a remote Endpoint is created, updated, and deleted", func() {
@@ -254,37 +261,89 @@ var _ = Describe("Handler", func() {
254261
})
255262
})
256263

257-
When("a NonGatewayRoute is created and deleted", func() {
264+
When("NonGatewayRoutes are created, updated and deleted", func() {
265+
verifyLogicalRouterPolicies := func(ngr *submarinerv1.NonGatewayRoute, nextHop string) {
266+
for _, cidr := range ngr.RoutePolicySpec.RemoteCIDRs {
267+
ovsdbClient.AwaitModel(&nbdb.LogicalRouterPolicy{
268+
Match: cidr,
269+
Nexthop: ptr.To(nextHop),
270+
})
271+
}
272+
}
273+
274+
verifyNoLogicalRouterPolicies := func(ngr *submarinerv1.NonGatewayRoute, nextHop string) {
275+
for _, cidr := range ngr.RoutePolicySpec.RemoteCIDRs {
276+
ovsdbClient.AwaitNoModel(&nbdb.LogicalRouterPolicy{
277+
Match: cidr,
278+
Nexthop: ptr.To(nextHop),
279+
})
280+
}
281+
}
282+
258283
It("should correctly reconcile OVN router policies", func() {
259284
client := t.dynClient.Resource(submarinerv1.SchemeGroupVersion.WithResource("nongatewayroutes")).Namespace(testing.Namespace)
260285

261-
nonGWRoute := &submarinerv1.NonGatewayRoute{
286+
By("Creating first NonGatewayRoute")
287+
288+
nextHop := "172.1.1.1"
289+
290+
nonGWRoute1 := &submarinerv1.NonGatewayRoute{
262291
ObjectMeta: metav1.ObjectMeta{
263-
Name: "test-nongateway-route",
292+
Name: "test-nongateway-route1",
264293
},
265294
RoutePolicySpec: submarinerv1.RoutePolicySpec{
266-
NextHops: []string{"111.1.1.1"},
267-
RemoteCIDRs: []string{"192.0.1.0/24", "192.0.2.0/24"},
295+
NextHops: []string{nextHop},
296+
RemoteCIDRs: []string{"111.0.1.0/24", "111.0.2.0/24"},
268297
},
269298
}
270299

271-
test.CreateResource(client, nonGWRoute)
300+
test.CreateResource(client, nonGWRoute1)
272301

273-
for _, cidr := range nonGWRoute.RoutePolicySpec.RemoteCIDRs {
274-
ovsdbClient.AwaitModel(&nbdb.LogicalRouterPolicy{
275-
Match: cidr,
276-
Nexthop: ptr.To(nonGWRoute.RoutePolicySpec.NextHops[0]),
277-
})
278-
}
302+
verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
279303

280-
Expect(client.Delete(context.Background(), nonGWRoute.Name, metav1.DeleteOptions{})).To(Succeed())
304+
By("Creating second NonGatewayRoute")
281305

282-
for _, cidr := range nonGWRoute.RoutePolicySpec.RemoteCIDRs {
283-
ovsdbClient.AwaitNoModel(&nbdb.LogicalRouterPolicy{
284-
Match: cidr,
285-
Nexthop: ptr.To(nonGWRoute.RoutePolicySpec.NextHops[0]),
286-
})
306+
nonGWRoute2 := &submarinerv1.NonGatewayRoute{
307+
ObjectMeta: metav1.ObjectMeta{
308+
Name: "test-nongateway-route2",
309+
},
310+
RoutePolicySpec: submarinerv1.RoutePolicySpec{
311+
NextHops: []string{nextHop},
312+
RemoteCIDRs: []string{"222.0.1.0/24", "222.0.2.0/24"},
313+
},
287314
}
315+
316+
test.CreateResource(client, nonGWRoute2)
317+
318+
verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
319+
verifyLogicalRouterPolicies(nonGWRoute2, nextHop)
320+
321+
By("Updating NextHop for first NonGatewayRoute")
322+
323+
prevNextHop := nextHop
324+
nextHop = "172.1.1.2"
325+
nonGWRoute1.RoutePolicySpec.NextHops[0] = nextHop
326+
327+
test.UpdateResource(client, nonGWRoute1)
328+
329+
verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
330+
verifyNoLogicalRouterPolicies(nonGWRoute1, prevNextHop)
331+
verifyNoLogicalRouterPolicies(nonGWRoute2, prevNextHop)
332+
333+
By("Updating NextHop for second NonGatewayRoute")
334+
335+
nonGWRoute2.RoutePolicySpec.NextHops[0] = nextHop
336+
337+
test.UpdateResource(client, nonGWRoute2)
338+
339+
verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
340+
verifyLogicalRouterPolicies(nonGWRoute2, nextHop)
341+
342+
By("Deleting first NonGatewayRoute")
343+
344+
Expect(client.Delete(context.Background(), nonGWRoute1.Name, metav1.DeleteOptions{})).To(Succeed())
345+
346+
verifyNoLogicalRouterPolicies(nonGWRoute1, nextHop)
288347
})
289348
})
290349

pkg/routeagent_driver/handlers/ovn/non_gateway_route_controller.go

+5-27
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,21 @@ import (
2222
"github.com/pkg/errors"
2323
"github.com/submariner-io/admiral/pkg/watcher"
2424
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
25-
nodeutil "github.com/submariner-io/submariner/pkg/node"
2625
"k8s.io/apimachinery/pkg/runtime"
2726
"k8s.io/apimachinery/pkg/util/sets"
28-
clientset "k8s.io/client-go/kubernetes"
2927
)
3028

3129
type NonGatewayRouteController struct {
3230
nonGatewayRouteWatcher watcher.Interface
3331
connectionHandler *ConnectionHandler
3432
remoteSubnets sets.Set[string]
3533
stopCh chan struct{}
36-
transitSwitchIP string
34+
transitSwitchIP TransitSwitchIPGetter
3735
}
3836

3937
//nolint:gocritic // Ignore hugeParam
4038
func NewNonGatewayRouteController(config watcher.Config, connectionHandler *ConnectionHandler,
41-
k8sClientSet clientset.Interface, namespace string,
39+
namespace string, transitSwitchIP TransitSwitchIPGetter,
4240
) (*NonGatewayRouteController, error) {
4341
// We'll panic if config is nil, this is intentional
4442
var err error
@@ -47,6 +45,7 @@ func NewNonGatewayRouteController(config watcher.Config, connectionHandler *Conn
4745
connectionHandler: connectionHandler,
4846
remoteSubnets: sets.New[string](),
4947
stopCh: make(chan struct{}),
48+
transitSwitchIP: transitSwitchIP,
5049
}
5150

5251
config.ResourceConfigs = []watcher.ResourceConfig{
@@ -62,25 +61,6 @@ func NewNonGatewayRouteController(config watcher.Config, connectionHandler *Conn
6261
},
6362
}
6463

65-
node, err := nodeutil.GetLocalNode(k8sClientSet)
66-
if err != nil {
67-
return nil, errors.Wrap(err, "error getting the local node info")
68-
}
69-
70-
annotations := node.GetAnnotations()
71-
72-
transitSwitchIP := annotations["k8s.ovn.org/node-transit-switch-port-ifaddr"]
73-
if transitSwitchIP == "" {
74-
// This is a non-IC setup , so this controller will not be started.
75-
logger.Infof("No transit switch IP configured on node %q", node.Name)
76-
return controller, nil
77-
}
78-
79-
controller.transitSwitchIP, err = jsonToIP(transitSwitchIP)
80-
if err != nil {
81-
return nil, errors.Wrapf(err, "error parsing transit switch IP")
82-
}
83-
8464
controller.nonGatewayRouteWatcher, err = watcher.New(&config)
8565
if err != nil {
8666
return nil, errors.Wrap(err, "error creating resource watcher")
@@ -129,7 +109,7 @@ func (g *NonGatewayRouteController) reconcileRemoteSubnets(submNonGWRoute *subma
129109
}
130110

131111
// If this node belongs to same zone as gateway node, ignore the event.
132-
if submNonGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP {
112+
if submNonGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP.Get() {
133113
for _, subnet := range submNonGWRoute.RoutePolicySpec.RemoteCIDRs {
134114
if addSubnet {
135115
g.remoteSubnets.Insert(subnet)
@@ -145,7 +125,5 @@ func (g *NonGatewayRouteController) reconcileRemoteSubnets(submNonGWRoute *subma
145125
}
146126

147127
func (g *NonGatewayRouteController) stop() {
148-
if g.transitSwitchIP != "" {
149-
close(g.stopCh)
150-
}
128+
close(g.stopCh)
151129
}

pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go

+47-30
Original file line numberDiff line numberDiff line change
@@ -28,47 +28,32 @@ import (
2828
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
2929
"github.com/submariner-io/submariner/pkg/cni"
3030
"github.com/submariner-io/submariner/pkg/event"
31-
nodeutil "github.com/submariner-io/submariner/pkg/node"
32-
"github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
31+
corev1 "k8s.io/api/core/v1"
3332
apierrors "k8s.io/apimachinery/pkg/api/errors"
3433
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35-
clientset "k8s.io/client-go/kubernetes"
34+
"k8s.io/client-go/kubernetes"
3635
)
3736

3837
type NonGatewayRouteHandler struct {
3938
event.HandlerBase
39+
event.NodeHandlerBase
4040
smClient submarinerClientset.Interface
41-
k8sClient clientset.Interface
42-
transitSwitchIP string
41+
k8sClient kubernetes.Interface
42+
transitSwitchIP TransitSwitchIP
4343
}
4444

45-
func NewNonGatewayRouteHandler(smClient submarinerClientset.Interface, k8sClient clientset.Interface) *NonGatewayRouteHandler {
45+
func NewNonGatewayRouteHandler(smClient submarinerClientset.Interface, k8sClient kubernetes.Interface, transitSwitchIP TransitSwitchIP,
46+
) *NonGatewayRouteHandler {
4647
return &NonGatewayRouteHandler{
47-
smClient: smClient,
48-
k8sClient: k8sClient,
48+
smClient: smClient,
49+
k8sClient: k8sClient,
50+
transitSwitchIP: transitSwitchIP,
4951
}
5052
}
5153

5254
func (h *NonGatewayRouteHandler) Init() error {
5355
logger.Info("Starting NonGatewayRouteHandler")
54-
55-
node, err := nodeutil.GetLocalNode(h.k8sClient)
56-
if err != nil {
57-
return errors.Wrap(err, "error getting the g/w node")
58-
}
59-
60-
annotations := node.GetAnnotations()
61-
62-
// TODO transitSwitchIP changes support needs to be added.
63-
transitSwitchIP, ok := annotations[constants.OvnTransitSwitchIPAnnotation]
64-
if !ok {
65-
logger.Infof("No transit switch IP configured")
66-
return nil
67-
}
68-
69-
h.transitSwitchIP, err = jsonToIP(transitSwitchIP)
70-
71-
return errors.Wrapf(err, "error parsing the transit switch IP")
56+
return errors.Wrap(h.transitSwitchIP.Init(h.k8sClient), "error initializing TransitSwitchIP")
7257
}
7358

7459
func (h *NonGatewayRouteHandler) GetName() string {
@@ -80,7 +65,7 @@ func (h *NonGatewayRouteHandler) GetNetworkPlugins() []string {
8065
}
8166

8267
func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
83-
if !h.State().IsOnGateway() || h.transitSwitchIP == "" {
68+
if !h.State().IsOnGateway() || h.transitSwitchIP.Get() == "" {
8469
return nil
8570
}
8671

@@ -98,7 +83,7 @@ func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.En
9883
}
9984

10085
func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
101-
if !h.State().IsOnGateway() || h.transitSwitchIP == "" {
86+
if !h.State().IsOnGateway() || h.transitSwitchIP.Get() == "" {
10287
return nil
10388
}
10489

@@ -113,7 +98,7 @@ func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.En
11398
}
11499

115100
func (h *NonGatewayRouteHandler) TransitionToGateway() error {
116-
if h.transitSwitchIP == "" {
101+
if h.transitSwitchIP.Get() == "" {
117102
return nil
118103
}
119104

@@ -141,6 +126,38 @@ func (h *NonGatewayRouteHandler) TransitionToGateway() error {
141126
return nil
142127
}
143128

129+
func (h *NonGatewayRouteHandler) NodeUpdated(node *corev1.Node) error {
130+
updated, err := h.transitSwitchIP.UpdateFrom(node)
131+
if err != nil {
132+
logger.Errorf(err, "Error updating transit switch IP from node: %s", resource.ToJSON(node))
133+
return nil
134+
}
135+
136+
if !updated {
137+
return nil
138+
}
139+
140+
logger.Infof("Transit switch IP updated to %s", h.transitSwitchIP.Get())
141+
142+
if !h.State().IsOnGateway() {
143+
return nil
144+
}
145+
146+
endpoints := h.State().GetRemoteEndpoints()
147+
for i := range endpoints {
148+
err = util.Update(context.TODO(), NonGatewayResourceInterface(h.smClient, endpoints[i].Namespace),
149+
h.newNonGatewayRoute(&endpoints[i]), func(existing *submarinerv1.NonGatewayRoute) (*submarinerv1.NonGatewayRoute, error) {
150+
existing.RoutePolicySpec.NextHops = []string{h.transitSwitchIP.Get()}
151+
return existing, nil
152+
})
153+
if err != nil {
154+
return errors.Wrapf(err, "error updating NonGatewayRoute")
155+
}
156+
}
157+
158+
return nil
159+
}
160+
144161
func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute {
145162
return &submarinerv1.NonGatewayRoute{
146163
ObjectMeta: metav1.ObjectMeta{
@@ -149,7 +166,7 @@ func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpo
149166
},
150167
RoutePolicySpec: submarinerv1.RoutePolicySpec{
151168
RemoteCIDRs: endpoint.Spec.Subnets,
152-
NextHops: []string{h.transitSwitchIP},
169+
NextHops: []string{h.transitSwitchIP.Get()},
153170
},
154171
}
155172
}

0 commit comments

Comments
 (0)