Skip to content

Commit 2c6411f

Browse files
committed
Adjust datastore syncer to watch the local Gateway
...instead of the local Node for global IP updates. Signed-off-by: Tom Pantelis <[email protected]>
1 parent 803f804 commit 2c6411f

File tree

4 files changed

+41
-68
lines changed

4 files changed

+41
-68
lines changed

pkg/controllers/datastoresyncer/datastore_endpoint_sync_test.go

+11-12
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
testutil "github.com/submariner-io/admiral/pkg/test"
3333
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
3434
"github.com/submariner-io/submariner/pkg/globalnet/constants"
35-
corev1 "k8s.io/api/core/v1"
3635
apierrors "k8s.io/apimachinery/pkg/api/errors"
3736
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3837
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -123,30 +122,30 @@ func testEndpointSyncing() {
123122
})
124123
})
125124

126-
When("the local Node's global IP is updated", func() {
127-
var node *corev1.Node
125+
When("the local Gateway's global IP is updated", func() {
126+
var gateway *submarinerv1.Gateway
128127

129128
BeforeEach(func() {
130-
node = &corev1.Node{
129+
gateway = &submarinerv1.Gateway{
131130
ObjectMeta: metav1.ObjectMeta{
132-
Name: nodeName,
131+
Name: t.localEndpoint.Hostname,
133132
Annotations: map[string]string{constants.SmGlobalIP: "200.0.0.40"},
134133
},
135134
}
136135

137-
test.CreateResource(t.localNodes, node)
136+
test.CreateResource(t.localGateways, gateway)
138137
})
139138

140139
JustBeforeEach(func() {
141-
t.localEndpoint.HealthCheckIP = node.Annotations[constants.SmGlobalIP]
140+
t.localEndpoint.HealthCheckIP = gateway.Annotations[constants.SmGlobalIP]
142141
awaitEndpoint(t.localEndpoints, t.localEndpoint)
143142
})
144143

145144
It("should update the local Endpoint's HealthCheckIP", func() {
146-
node.Annotations[constants.SmGlobalIP] = "200.0.0.100"
147-
t.localEndpoint.HealthCheckIP = node.Annotations[constants.SmGlobalIP]
145+
gateway.Annotations[constants.SmGlobalIP] = "200.0.0.100"
146+
t.localEndpoint.HealthCheckIP = gateway.Annotations[constants.SmGlobalIP]
148147

149-
test.UpdateResource(t.localNodes, node)
148+
test.UpdateResource(t.localGateways, gateway)
150149
awaitEndpoint(t.localEndpoints, t.localEndpoint)
151150
})
152151

@@ -155,8 +154,8 @@ func testEndpointSyncing() {
155154
Expect(t.localEndpoints.Delete(context.Background(), getEndpointName(t.localEndpoint), metav1.DeleteOptions{})).
156155
To(Succeed())
157156

158-
node.Annotations[constants.SmGlobalIP] = "200.0.0.100"
159-
test.UpdateResource(t.localNodes, node)
157+
gateway.Annotations[constants.SmGlobalIP] = "200.0.0.100"
158+
test.UpdateResource(t.localGateways, gateway)
160159

161160
testutil.EnsureNoResource(resource.ForDynamic(t.localEndpoints), getEndpointName(t.localEndpoint))
162161
})

pkg/controllers/datastoresyncer/datastoresyncer.go

+13-26
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package datastoresyncer
2020

2121
import (
2222
"context"
23-
"os"
2423

2524
"github.com/pkg/errors"
2625
"github.com/submariner-io/admiral/pkg/federate"
@@ -33,10 +32,10 @@ import (
3332
"github.com/submariner-io/submariner/pkg/cidr"
3433
"github.com/submariner-io/submariner/pkg/endpoint"
3534
"github.com/submariner-io/submariner/pkg/types"
36-
k8sv1 "k8s.io/api/core/v1"
3735
apierrors "k8s.io/apimachinery/pkg/api/errors"
3836
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3937
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
38+
"k8s.io/apimachinery/pkg/fields"
4039
"k8s.io/apimachinery/pkg/runtime"
4140
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
4241
"k8s.io/client-go/dynamic"
@@ -47,7 +46,6 @@ import (
4746
type DatastoreSyncer struct {
4847
localCluster types.SubmarinerCluster
4948
localEndpoint *endpoint.Local
50-
localNodeName string
5149
syncerConfig broker.SyncerConfig
5250
}
5351

@@ -94,8 +92,8 @@ func (d *DatastoreSyncer) Start(ctx context.Context) error {
9492
}
9593

9694
if len(d.localCluster.Spec.GlobalCIDR) > 0 {
97-
if err := d.startNodeWatcher(ctx.Done()); err != nil {
98-
return errors.WithMessage(err, "startNodeWatcher returned error")
95+
if err := d.startGatewayWatcher(ctx.Done()); err != nil {
96+
return errors.WithMessage(err, "startGatewayWatcher returned error")
9997
}
10098
}
10199

@@ -231,45 +229,34 @@ func (d *DatastoreSyncer) ensureExclusiveEndpoint(ctx context.Context, syncer *b
231229
return nil
232230
}
233231

234-
func (d *DatastoreSyncer) startNodeWatcher(stopCh <-chan struct{}) error {
235-
nodeName, ok := os.LookupEnv("NODE_NAME")
236-
if !ok {
237-
// Healthcheck in globalnet deployments will not work because of missing NODE_NAME.
238-
logger.Error(nil, "Error reading the NODE_NAME from the env, healthChecker functionality will not work.")
239-
} else {
240-
d.localNodeName = nodeName
241-
return d.createNodeWatcher(stopCh)
242-
}
243-
244-
return nil
245-
}
246-
247-
func (d *DatastoreSyncer) createNodeWatcher(stopCh <-chan struct{}) error {
232+
func (d *DatastoreSyncer) startGatewayWatcher(stopCh <-chan struct{}) error {
248233
resourceWatcher, err := watcher.New(&watcher.Config{
249234
Scheme: scheme.Scheme,
250235
RestConfig: d.syncerConfig.LocalRestConfig,
251236
RestMapper: d.syncerConfig.RestMapper,
252237
Client: d.syncerConfig.LocalClient,
253238
ResourceConfigs: []watcher.ResourceConfig{
254239
{
255-
Name: "Node watcher for datastoresyncer",
256-
ResourceType: &k8sv1.Node{},
257-
ResourcesEquivalent: d.areNodesEquivalent,
240+
Name: "Gateway watcher for datastoresyncer",
241+
ResourceType: &submarinerv1.Gateway{},
242+
SourceNamespace: d.syncerConfig.LocalNamespace,
243+
ResourcesEquivalent: d.areGatewaysEquivalent,
244+
SourceFieldSelector: fields.Set(map[string]string{"metadata.name": d.localEndpoint.Spec().Hostname}).AsSelector().String(),
258245
Handler: watcher.EventHandlerFuncs{
259-
OnCreateFunc: d.handleCreateOrUpdateNode,
260-
OnUpdateFunc: d.handleCreateOrUpdateNode,
246+
OnCreateFunc: d.handleCreateOrUpdateGateway,
247+
OnUpdateFunc: d.handleCreateOrUpdateGateway,
261248
OnDeleteFunc: nil,
262249
},
263250
},
264251
},
265252
})
266253
if err != nil {
267-
return errors.Wrap(err, "error creating resource watcher for Nodes")
254+
return errors.Wrap(err, "error creating Gateway resource watcher")
268255
}
269256

270257
err = resourceWatcher.Start(stopCh)
271258
if err != nil {
272-
return errors.Wrap(err, "error starting the resource watcher")
259+
return errors.Wrap(err, "error starting the Gateway resource watcher")
273260
}
274261

275262
return nil

pkg/controllers/datastoresyncer/datastoresyncer_suite_test.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package datastoresyncer_test
2121
import (
2222
"context"
2323
"fmt"
24-
"os"
2524
"reflect"
2625
"testing"
2726
"time"
@@ -53,7 +52,6 @@ const (
5352
otherClusterID = "west"
5453
localNamespace = "submariner"
5554
brokerNamespace = "submariner-broker"
56-
nodeName = "raiders"
5755
)
5856

5957
func TestDatastoresyncer(t *testing.T) {
@@ -83,7 +81,7 @@ type testDriver struct {
8381
localClusters dynamic.ResourceInterface
8482
brokerClusters dynamic.ResourceInterface
8583
localEndpoints dynamic.ResourceInterface
86-
localNodes dynamic.ResourceInterface
84+
localGateways dynamic.ResourceInterface
8785
brokerEndpoints dynamic.ResourceInterface
8886
syncerScheme *runtime.Scheme
8987
restMapper meta.RESTMapper
@@ -128,7 +126,7 @@ func newTestDriver() *testDriver {
128126
t.brokerClient = dynamicfake.NewSimpleDynamicClient(t.syncerScheme)
129127
fake.AddBasicReactors(&t.brokerClient.Fake)
130128

131-
t.restMapper = test.GetRESTMapperFor(&submarinerv1.Cluster{}, &submarinerv1.Endpoint{}, &corev1.Node{})
129+
t.restMapper = test.GetRESTMapperFor(&submarinerv1.Cluster{}, &submarinerv1.Endpoint{}, &submarinerv1.Gateway{}, &corev1.Node{})
132130

133131
clusterGVR := test.GetGroupVersionResourceFor(t.restMapper, &submarinerv1.Cluster{})
134132
t.localClusters = t.localClient.Resource(*clusterGVR).Namespace(localNamespace)
@@ -138,7 +136,8 @@ func newTestDriver() *testDriver {
138136
t.localEndpoints = t.localClient.Resource(*endpointGVR).Namespace(localNamespace)
139137
t.brokerEndpoints = t.brokerClient.Resource(*endpointGVR).Namespace(brokerNamespace)
140138

141-
t.localNodes = t.localClient.Resource(*test.GetGroupVersionResourceFor(t.restMapper, &corev1.Node{})).Namespace("")
139+
t.localGateways = t.localClient.Resource(*test.GetGroupVersionResourceFor(t.restMapper, &submarinerv1.Gateway{})).
140+
Namespace(localNamespace)
142141
})
143142

144143
JustBeforeEach(func() {
@@ -153,8 +152,6 @@ func newTestDriver() *testDriver {
153152
}
154153

155154
func (t *testDriver) run() {
156-
os.Setenv("NODE_NAME", nodeName)
157-
158155
t.syncer = datastoresyncer.New(&broker.SyncerConfig{
159156
LocalClient: t.localClient,
160157
LocalNamespace: localNamespace,

pkg/controllers/datastoresyncer/node_handler.go renamed to pkg/controllers/datastoresyncer/gateway_handler.go

+13-23
Original file line numberDiff line numberDiff line change
@@ -22,44 +22,34 @@ import (
2222
"context"
2323
"net"
2424

25+
"github.com/submariner-io/admiral/pkg/resource"
2526
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
2627
"github.com/submariner-io/submariner/pkg/globalnet/constants"
27-
k8sv1 "k8s.io/api/core/v1"
2828
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2929
"k8s.io/apimachinery/pkg/runtime"
3030
)
3131

32-
func (d *DatastoreSyncer) handleCreateOrUpdateNode(obj runtime.Object, _ int) bool {
33-
node := obj.(*k8sv1.Node)
34-
if node.Name != d.localNodeName {
35-
return false
36-
}
37-
38-
globalIPOfNode := node.GetAnnotations()[constants.SmGlobalIP]
32+
func (d *DatastoreSyncer) handleCreateOrUpdateGateway(obj runtime.Object, _ int) bool {
33+
globalIP := resource.MustToMeta(obj).GetAnnotations()[constants.SmGlobalIP]
3934

40-
// Validate that globalIPOfNode falls in the globalCIDR allocated to the cluster.
41-
if globalIPOfNode != "" {
35+
// Validate that the global IP falls in the global CIDR allocated to the cluster.
36+
if globalIP != "" {
4237
_, ipnet, err := net.ParseCIDR(d.localCluster.Spec.GlobalCIDR[0])
4338
if err != nil {
4439
// Ideally this will not happen as globalCIDR is expected to be a valid CIDR.
4540
logger.Errorf(err, "Error parsing the GlobalCIDR %q", d.localCluster.Spec.GlobalCIDR)
4641
return false
4742
}
4843

49-
if ipnet.Contains(net.ParseIP(globalIPOfNode)) {
50-
return d.updateLocalEndpointIfNecessary(globalIPOfNode)
44+
if ipnet.Contains(net.ParseIP(globalIP)) {
45+
return d.updateLocalEndpointIfNecessary(globalIP)
5146
}
5247
}
5348

5449
return false
5550
}
5651

57-
func (d *DatastoreSyncer) areNodesEquivalent(obj1, obj2 *unstructured.Unstructured) bool {
58-
if obj1.GetName() != d.localNodeName {
59-
// Ignore this event. We are only interested in active GatewayNode events.
60-
return true
61-
}
62-
52+
func (d *DatastoreSyncer) areGatewaysEquivalent(obj1, obj2 *unstructured.Unstructured) bool {
6353
existingGlobalIP := obj1.GetAnnotations()[constants.SmGlobalIP]
6454
newGlobalIP := obj2.GetAnnotations()[constants.SmGlobalIP]
6555

@@ -70,16 +60,16 @@ func (d *DatastoreSyncer) areNodesEquivalent(obj1, obj2 *unstructured.Unstructur
7060
return existingGlobalIP == newGlobalIP
7161
}
7262

73-
func (d *DatastoreSyncer) updateLocalEndpointIfNecessary(globalIPOfNode string) bool {
63+
func (d *DatastoreSyncer) updateLocalEndpointIfNecessary(globalIP string) bool {
7464
spec := d.localEndpoint.Spec()
75-
if spec.HealthCheckIP != globalIPOfNode {
76-
logger.Infof("Updating the endpoint HealthCheckIP to globalIP %q", globalIPOfNode)
65+
if spec.HealthCheckIP != globalIP {
66+
logger.Infof("Updating the endpoint HealthCheckIP to globalIP %q", globalIP)
7767

7868
err := d.localEndpoint.Update(context.TODO(), func(existing *submarinerv1.EndpointSpec) {
79-
existing.HealthCheckIP = globalIPOfNode
69+
existing.HealthCheckIP = globalIP
8070
})
8171
if err != nil {
82-
logger.Warningf("Error updating the local submariner Endpoint with HealthcheckIP %s: %v", globalIPOfNode, err)
72+
logger.Warningf("Error updating the local submariner Endpoint with HealthcheckIP %s: %v", globalIP, err)
8373
return true
8474
}
8575
}

0 commit comments

Comments
 (0)