Skip to content

Commit 803f804

Browse files
committed
Modify globalnet to annotate the local Gateway with the global IP
...instead of the local Node. Signed-off-by: Tom Pantelis <[email protected]>
1 parent a33540b commit 803f804

9 files changed

+326
-239
lines changed

pkg/globalnet/controllers/base_controllers.go

+5
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ func (c *baseSyncerController) Start() error {
7272
return c.resourceSyncer.Start(c.stopCh) //nolint:wrapcheck // Let the caller wrap it
7373
}
7474

75+
func (c *baseSyncerController) Stop() {
76+
c.baseController.Stop()
77+
c.resourceSyncer.AwaitStopped()
78+
}
79+
7580
func (c *baseSyncerController) reconcile(client dynamic.ResourceInterface, labelSelector, fieldSelector string,
7681
transform func(obj *unstructured.Unstructured) runtime.Object,
7782
) {

pkg/globalnet/controllers/controllers_suite_test.go

+28-19
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"context"
2323
"fmt"
2424
"net"
25+
"os"
2526
"reflect"
2627
"testing"
2728
"time"
@@ -107,21 +108,23 @@ type testDriverBase struct {
107108
pool *ipam.IPPool
108109
localSubnets []string
109110
globalCIDR string
111+
hostName string
110112
globalEgressIPs dynamic.ResourceInterface
111113
clusterGlobalEgressIPs dynamic.ResourceInterface
112114
globalIngressIPs dynamic.ResourceInterface
113115
services dynamic.ResourceInterface
114116
serviceExports dynamic.ResourceInterface
115117
endpoints dynamic.ResourceInterface
116118
pods dynamic.NamespaceableResourceInterface
117-
nodes dynamic.ResourceInterface
119+
gateways dynamic.ResourceInterface
118120
watches *fakeDynClient.WatchReactor
119121
}
120122

121123
func newTestDriverBase() *testDriverBase {
122124
t := &testDriverBase{
123-
restMapper: test.GetRESTMapperFor(&submarinerv1.Endpoint{}, &corev1.Service{}, &corev1.Node{}, &corev1.Pod{}, &corev1.Endpoints{},
124-
&submarinerv1.GlobalEgressIP{}, &submarinerv1.ClusterGlobalEgressIP{}, &submarinerv1.GlobalIngressIP{}, &mcsv1a1.ServiceExport{}),
125+
restMapper: test.GetRESTMapperFor(&submarinerv1.Endpoint{}, &corev1.Service{}, &corev1.Pod{}, &corev1.Endpoints{},
126+
&submarinerv1.GlobalEgressIP{}, &submarinerv1.ClusterGlobalEgressIP{}, &submarinerv1.GlobalIngressIP{},
127+
&submarinerv1.Gateway{}, &mcsv1a1.ServiceExport{}),
125128
scheme: runtime.NewScheme(),
126129
pFilter: fakePF.New(),
127130
globalCIDR: globalCIDR,
@@ -150,7 +153,12 @@ func newTestDriverBase() *testDriverBase {
150153

151154
t.serviceExports = t.dynClient.Resource(*test.GetGroupVersionResourceFor(t.restMapper, &mcsv1a1.ServiceExport{})).Namespace(namespace)
152155

153-
t.nodes = t.dynClient.Resource(*test.GetGroupVersionResourceFor(t.restMapper, &corev1.Node{}))
156+
t.gateways = t.dynClient.Resource(*test.GetGroupVersionResourceFor(t.restMapper, &submarinerv1.Gateway{})).Namespace(namespace)
157+
158+
var err error
159+
160+
t.hostName, err = os.Hostname()
161+
Expect(err).To(Succeed())
154162

155163
return t
156164
}
@@ -264,26 +272,30 @@ func (t *testDriverBase) getGlobalIngressIPStatus(name string) *submarinerv1.Glo
264272
return status
265273
}
266274

267-
func (t *testDriverBase) createNode(name, globalIP string) *corev1.Node {
268-
node := &corev1.Node{
275+
func (t *testDriverBase) createGateway(name, globalIP string) *submarinerv1.Gateway {
276+
gateway := &submarinerv1.Gateway{
269277
ObjectMeta: metav1.ObjectMeta{
270278
Name: name,
271279
},
272280
}
273281

274-
addAnnotation(node, constants.SmGlobalIP, globalIP)
282+
addAnnotation(gateway, constants.SmGlobalIP, globalIP)
275283

276-
return test.CreateResource(t.nodes, node)
284+
return test.CreateResource(t.gateways, gateway)
277285
}
278286

279-
func (t *testDriverBase) awaitNodeGlobalIP(oldIP string) string {
287+
func (t *testDriverBase) getGatewayGlobalIP(name string) string {
288+
obj, err := t.gateways.Get(context.TODO(), name, metav1.GetOptions{})
289+
Expect(err).To(Succeed())
290+
291+
return obj.GetAnnotations()[constants.SmGlobalIP]
292+
}
293+
294+
func (t *testDriverBase) awaitGatewayGlobalIP(oldIP string) string {
280295
var globalIP string
281296

282297
Eventually(func() string {
283-
obj, err := t.nodes.Get(context.TODO(), nodeName, metav1.GetOptions{})
284-
Expect(err).To(Succeed())
285-
286-
globalIP = obj.GetAnnotations()[constants.SmGlobalIP]
298+
globalIP = t.getGatewayGlobalIP(t.hostName)
287299
return globalIP
288300
}, 5).ShouldNot(Or(BeEmpty(), Equal(oldIP)))
289301

@@ -295,13 +307,10 @@ func (t *testDriverBase) awaitNodeGlobalIP(oldIP string) string {
295307
return globalIP
296308
}
297309

298-
func (t *testDriverBase) ensureNoNodeGlobalIP() {
310+
func (t *testDriverBase) ensureGatewayGlobalIP(name, ip string) {
299311
Consistently(func() string {
300-
obj, err := t.nodes.Get(context.TODO(), nodeName, metav1.GetOptions{})
301-
Expect(err).To(Succeed())
302-
303-
return obj.GetAnnotations()[constants.SmGlobalIP]
304-
}, 500*time.Millisecond).Should(BeEmpty())
312+
return t.getGatewayGlobalIP(name)
313+
}, 500*time.Millisecond).Should(Equal(ip))
305314
}
306315

307316
func addAnnotation(obj metav1.Object, key, value string) {

pkg/globalnet/controllers/gateway_controller.go

+80-66
Original file line numberDiff line numberDiff line change
@@ -25,142 +25,147 @@ import (
2525
"github.com/submariner-io/admiral/pkg/federate"
2626
"github.com/submariner-io/admiral/pkg/syncer"
2727
admUtil "github.com/submariner-io/admiral/pkg/util"
28-
"github.com/submariner-io/submariner/pkg/cni"
28+
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
2929
"github.com/submariner-io/submariner/pkg/globalnet/constants"
30-
packetfilter "github.com/submariner-io/submariner/pkg/globalnet/controllers/packetfilter"
30+
"github.com/submariner-io/submariner/pkg/globalnet/controllers/packetfilter"
3131
"github.com/submariner-io/submariner/pkg/ipam"
32-
corev1 "k8s.io/api/core/v1"
3332
"k8s.io/apimachinery/pkg/api/meta"
3433
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3534
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3635
"k8s.io/apimachinery/pkg/runtime"
36+
"k8s.io/client-go/tools/cache"
3737
)
3838

39-
func NewNodeController(config *syncer.ResourceSyncerConfig, pool *ipam.IPPool, nodeName string, clusterCIDRs []string) (Interface, error) {
39+
func NewGatewayController(config *syncer.ResourceSyncerConfig, informer cache.SharedInformer, pool *ipam.IPPool, hostName,
40+
namespace, cniIP string,
41+
) (Interface, error) {
4042
// We'll panic if config is nil, this is intentional
4143
var err error
4244

43-
logger.Info("Creating Node controller")
45+
logger.Info("Creating Gateway controller")
4446

4547
pfIface, err := packetfilter.New()
4648
if err != nil {
4749
return nil, errors.Wrap(err, "error creating the PacketFilter Interface handler")
4850
}
4951

50-
controller := &nodeController{
52+
controller := &gatewayController{
5153
baseIPAllocationController: newBaseIPAllocationController(pool, pfIface),
52-
nodeName: nodeName,
54+
hostName: hostName,
55+
cniIP: cniIP,
5356
}
5457

55-
cniIface, err := cni.Discover(clusterCIDRs)
56-
if err == nil {
57-
controller.cniIP = cniIface.IPAddress
58+
config = NewGatewayResourceSyncerConfig(config, namespace)
5859

59-
logger.Infof("Discovered CNI interface IP %q", controller.cniIP)
60-
} else {
61-
logger.Errorf(err, "Error obtaining CNI IP address - health check functionality will not work")
62-
}
63-
64-
federator := federate.NewUpdateFederator(config.SourceClient, config.RestMapper, corev1.NamespaceAll,
60+
config.Federator = federate.NewUpdateFederator(config.SourceClient, config.RestMapper, namespace,
6561
func(oldObj *unstructured.Unstructured, newObj *unstructured.Unstructured) *unstructured.Unstructured {
66-
return updateNodeAnnotation(oldObj, newObj.GetAnnotations()[constants.SmGlobalIP]).(*unstructured.Unstructured)
62+
return updateGlobalIPAnnotation(oldObj, newObj.GetAnnotations()[constants.SmGlobalIP]).(*unstructured.Unstructured)
6763
})
6864

69-
controller.resourceSyncer, err = syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{
70-
Name: "Node syncer",
71-
ResourceType: &corev1.Node{},
72-
SourceClient: config.SourceClient,
73-
SourceNamespace: corev1.NamespaceAll,
74-
RestMapper: config.RestMapper,
75-
Federator: federator,
76-
Scheme: config.Scheme,
77-
Transform: controller.process,
78-
})
65+
config.Transform = controller.process
66+
config.Name = "Gateway syncer"
67+
68+
controller.resourceSyncer, err = syncer.NewResourceSyncerWithSharedInformer(config, informer)
7969
if err != nil {
80-
return nil, errors.Wrap(err, "error creating the federator")
70+
return nil, errors.Wrap(err, "error creating the resource syncer")
8171
}
8272

83-
_, gvr, err := admUtil.ToUnstructuredResource(&corev1.Node{}, config.RestMapper)
73+
_, gvr, err := admUtil.ToUnstructuredResource(&submarinerv1.Gateway{}, config.RestMapper)
8474
if err != nil {
8575
return nil, errors.Wrap(err, "error converting resource")
8676
}
8777

88-
controller.nodes = config.SourceClient.Resource(*gvr)
78+
gatewayClient := config.SourceClient.Resource(*gvr).Namespace(namespace)
79+
80+
// Gateways retain their allocated global IPs regardless of their HA status (active or passive) so reserve the global IPs
81+
// for all the Gateways.
8982

90-
localNodeInfo, err := controller.nodes.Get(context.TODO(), controller.nodeName, metav1.GetOptions{})
83+
gateways, err := gatewayClient.List(context.TODO(), metav1.ListOptions{})
9184
if err != nil {
92-
return nil, errors.Wrapf(err, "error retrieving local Node %q", controller.nodeName)
85+
return nil, errors.Wrap(err, "error listing Gateways")
9386
}
9487

95-
if err := controller.reserveAllocatedIP(federator, localNodeInfo); err != nil {
96-
return nil, err
88+
for i := range gateways.Items {
89+
if err := controller.reserveAllocatedIP(config.Federator, &gateways.Items[i]); err != nil {
90+
return nil, err
91+
}
9792
}
9893

9994
return controller, nil
10095
}
10196

102-
func (n *nodeController) process(from runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) {
103-
node := from.(*corev1.Node)
104-
105-
// If the event corresponds to a different node which has globalIP annotation, release the globalIP back to Pool.
106-
if node.Name != n.nodeName {
107-
if existingGlobalIP := node.GetAnnotations()[constants.SmGlobalIP]; existingGlobalIP != "" {
108-
logger.Infof("Processing %sd non-gateway node %q - releasing GlobalIP %q", op, node.Name, existingGlobalIP)
97+
func NewGatewayResourceSyncerConfig(base *syncer.ResourceSyncerConfig, namespace string) *syncer.ResourceSyncerConfig {
98+
return &syncer.ResourceSyncerConfig{
99+
ResourceType: &submarinerv1.Gateway{},
100+
SourceClient: base.SourceClient,
101+
SourceNamespace: namespace,
102+
RestMapper: base.RestMapper,
103+
Scheme: base.Scheme,
104+
ResourcesEquivalent: func(_, _ *unstructured.Unstructured) bool {
105+
return true // we don't need to handle updates
106+
},
107+
}
108+
}
109109

110-
if op == syncer.Delete {
111-
_ = n.pool.Release(existingGlobalIP)
110+
func (n *gatewayController) process(from runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) {
111+
gateway := from.(*submarinerv1.Gateway)
112112

113-
return nil, false
114-
}
113+
if op == syncer.Delete {
114+
// Gateway deleted - here we just release its global IP, if any. If it's the local Gateway then the other Gateway controller
115+
// started by the gatewayMonitor will remove the ingress rules in parallel.
116+
if existingGlobalIP := gateway.GetAnnotations()[constants.SmGlobalIP]; existingGlobalIP != "" {
117+
logger.Infof(" Gateway %q deleted - releasing its global IP %q", gateway.Name, existingGlobalIP)
115118

116119
_ = n.pool.Release(existingGlobalIP)
117-
118-
return updateNodeAnnotation(node, ""), false
119120
}
120121

121122
return nil, false
122123
}
123124

124-
logger.Infof("Processing %sd Node %q", op, node.Name)
125+
if gateway.Name != n.hostName {
126+
return nil, false
127+
}
128+
129+
logger.Infof("Processing %sd local Gateway %q", op, gateway.Name)
125130

126-
return n.allocateIP(node)
131+
return n.allocateIP(gateway)
127132
}
128133

129-
func (n *nodeController) allocateIP(node *corev1.Node) (runtime.Object, bool) {
134+
func (n *gatewayController) allocateIP(gateway *submarinerv1.Gateway) (runtime.Object, bool) {
130135
if n.cniIP == "" {
131136
// To support connectivity from HostNetwork to remote clusters, globalnet requires the CNI IP of the local node.
132137
return nil, false
133138
}
134139

135-
globalIP := node.GetAnnotations()[constants.SmGlobalIP]
140+
globalIP := gateway.GetAnnotations()[constants.SmGlobalIP]
136141
if globalIP != "" {
137142
return nil, false
138143
}
139144

140145
ips, err := n.pool.Allocate(1)
141146
if err != nil {
142-
logger.Errorf(err, "Error allocating IPs for node %q", node.Name)
147+
logger.Errorf(err, "Error allocating IPs for Gateway %q", gateway.Name)
143148
return nil, true
144149
}
145150

146151
globalIP = ips[0]
147152

148-
logger.Infof("Allocated global IP %s for node %q", globalIP, node.Name)
153+
logger.Infof("Allocated global IP %s for Gateway %q", globalIP, gateway.Name)
149154

150-
logger.Infof("Adding ingress rules for node %q with global IP %s, CNI IP %s", node.Name, globalIP, n.cniIP)
155+
logger.Infof("Adding ingress rules for Gateway %q with global IP %s, CNI IP %s", gateway.Name, globalIP, n.cniIP)
151156

152157
if err := n.pfIface.AddIngressRulesForHealthCheck(n.cniIP, globalIP); err != nil {
153-
logger.Errorf(err, "Error programming rules for Gateway healthcheck on node %q", node.Name)
158+
logger.Errorf(err, "Error programming ingress rules for Gateway %q", gateway.Name)
154159

155160
_ = n.pool.Release(globalIP)
156161

157162
return nil, true
158163
}
159164

160-
return updateNodeAnnotation(node, globalIP), false
165+
return updateGlobalIPAnnotation(gateway, globalIP), false
161166
}
162167

163-
func (n *nodeController) reserveAllocatedIP(federator federate.Federator, obj *unstructured.Unstructured) error {
168+
func (n *gatewayController) reserveAllocatedIP(federator federate.Federator, obj *unstructured.Unstructured) error {
164169
existingGlobalIP := obj.GetAnnotations()[constants.SmGlobalIP]
165170
if existingGlobalIP == "" {
166171
return nil
@@ -170,32 +175,41 @@ func (n *nodeController) reserveAllocatedIP(federator federate.Federator, obj *u
170175
return nil
171176
}
172177

178+
// If this is not the Gateway on the local host then just reserve its global IP.
179+
173180
err := n.pool.Reserve(existingGlobalIP)
174-
if err == nil {
181+
if err == nil && obj.GetName() == n.hostName {
175182
err = n.pfIface.AddIngressRulesForHealthCheck(n.cniIP, existingGlobalIP)
176183
if err != nil {
177184
_ = n.pool.Release(existingGlobalIP)
178185
}
179186
}
180187

181188
if err != nil {
182-
logger.Warningf("Could not reserve allocated GlobalIP for Node %q: %v", obj.GetName(), err)
189+
logger.Warningf("Could not reserve allocated GlobalIP for Gateway %q: %v", obj.GetName(), err)
190+
191+
// If this is not the Gateway on the local host/node then leave the annotation as is since we're not able to remove the
192+
// ingress rules from another node. We'll let the globalnet process running on that host take care of it.
193+
194+
if obj.GetName() != n.hostName {
195+
return nil
196+
}
183197

184198
if err := n.pfIface.RemoveIngressRulesForHealthCheck(n.cniIP, existingGlobalIP); err != nil {
185-
logger.Errorf(err, "Error deleting rules for Gateway healthcheck on node %q", n.nodeName)
199+
logger.Errorf(err, "Error deleting rules for Gateway %q", n.hostName)
186200
}
187201

188-
return errors.Wrap(federator.Distribute(context.TODO(), updateNodeAnnotation(obj, "")),
189-
"error updating the Node global IP annotation")
202+
return errors.Wrap(federator.Distribute(context.TODO(), updateGlobalIPAnnotation(obj, "")),
203+
"error updating the Gateway global IP annotation")
190204
}
191205

192-
logger.Infof("Successfully reserved allocated GlobalIP %q for node %q", existingGlobalIP, obj.GetName())
206+
logger.Infof("Successfully reserved allocated GlobalIP %q for Gateway %q", existingGlobalIP, obj.GetName())
193207

194208
return nil
195209
}
196210

197-
func updateNodeAnnotation(node runtime.Object, globalIP string) runtime.Object {
198-
objMeta, _ := meta.Accessor(node)
211+
func updateGlobalIPAnnotation(obj runtime.Object, globalIP string) runtime.Object {
212+
objMeta, _ := meta.Accessor(obj)
199213

200214
annotations := objMeta.GetAnnotations()
201215
if annotations == nil {
@@ -210,5 +224,5 @@ func updateNodeAnnotation(node runtime.Object, globalIP string) runtime.Object {
210224

211225
objMeta.SetAnnotations(annotations)
212226

213-
return node
227+
return obj
214228
}

0 commit comments

Comments
 (0)