Skip to content

Commit 6305497

Browse files
author
Joji Mekkatt
committed
Fixes Issue #575
1 parent c1c5f41 commit 6305497

File tree

10 files changed

+303
-60
lines changed

10 files changed

+303
-60
lines changed

netmaster/daemon/daemon.go

+28-17
Original file line numberDiff line numberDiff line change
@@ -128,31 +128,42 @@ func (d *MasterDaemon) registerService() {
128128
log.Infof("Registered netmaster service with registry")
129129
}
130130

131-
// Find all netplugin nodes and register them
132-
func (d *MasterDaemon) registerNetpluginNodes() error {
133-
// Get all netplugin services
134-
srvList, err := master.ObjdbClient.GetService("netplugin")
131+
// Find all netplugin nodes and add them to ofnet master
132+
func (d *MasterDaemon) agentDiscoveryLoop() {
133+
134+
// Create channels for watch thread
135+
agentEventCh := make(chan objdb.WatchServiceEvent, 1)
136+
watchStopCh := make(chan bool, 1)
137+
138+
// Start a watch on netplugin service
139+
err := master.ObjdbClient.WatchService("netplugin", agentEventCh, watchStopCh)
135140
if err != nil {
136-
log.Errorf("Error getting netplugin nodes. Err: %v", err)
137-
return err
141+
log.Fatalf("Could not start a watch on netplugin service. Err: %v", err)
138142
}
139143

140-
// Add each node
141-
for _, srv := range srvList {
144+
for {
145+
agentEv := <-agentEventCh
146+
log.Debugf("Received netplugin watch event: %+v", agentEv)
142147
// build host info
143148
nodeInfo := ofnet.OfnetNode{
144-
HostAddr: srv.HostAddr,
145-
HostPort: uint16(srv.Port),
149+
HostAddr: agentEv.ServiceInfo.HostAddr,
150+
HostPort: uint16(agentEv.ServiceInfo.Port),
146151
}
147152

148-
// Add the node
149-
err = d.ofnetMaster.AddNode(nodeInfo)
150-
if err != nil {
151-
log.Errorf("Error adding node %v. Err: %v", srv, err)
153+
if agentEv.EventType == objdb.WatchServiceEventAdd {
154+
err = d.ofnetMaster.AddNode(nodeInfo)
155+
if err != nil {
156+
log.Errorf("Error adding node %v. Err: %v", nodeInfo, err)
157+
}
158+
} else if agentEv.EventType == objdb.WatchServiceEventDel {
159+
var res bool
160+
log.Infof("Unregister node %+v", nodeInfo)
161+
d.ofnetMaster.UnRegisterNode(&nodeInfo, &res)
152162
}
153-
}
154163

155-
return nil
164+
// Dont process next peer event for another 100ms
165+
time.Sleep(100 * time.Millisecond)
166+
}
156167
}
157168

158169
// registerRoutes registers HTTP route handlers
@@ -338,7 +349,7 @@ func (d *MasterDaemon) RunMasterFsm() {
338349
}
339350

340351
// Register all existing netplugins in the background
341-
go d.registerNetpluginNodes()
352+
go d.agentDiscoveryLoop()
342353

343354
// Create the lock
344355
leaderLock, err = master.ObjdbClient.NewLock("netmaster/leader", localIP, leaderLockTTL)

netplugin/cluster/cluster.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ import (
3535
// This file implements netplugin <-> netmaster clustering
3636

3737
const (
38-
netmasterRPCPort = 9001
39-
netpluginRPCPort = 9002
40-
vxlanUDPPort = 4789
38+
netmasterRPCPort = 9001
39+
netpluginRPCPort1 = 9002
40+
netpluginRPCPort2 = 9003
41+
vxlanUDPPort = 4789
4142
)
4243

4344
// ObjdbClient client
@@ -194,7 +195,7 @@ func registerService(objClient objdb.API, ctrlIP, vtepIP, hostname string) error
194195
ServiceName: "netplugin",
195196
TTL: 10,
196197
HostAddr: ctrlIP,
197-
Port: netpluginRPCPort,
198+
Port: netpluginRPCPort1,
198199
Hostname: hostname,
199200
}
200201

@@ -205,6 +206,21 @@ func registerService(objClient objdb.API, ctrlIP, vtepIP, hostname string) error
205206
return err
206207
}
207208

209+
srvInfo = objdb.ServiceInfo{
210+
ServiceName: "netplugin",
211+
TTL: 10,
212+
HostAddr: ctrlIP,
213+
Port: netpluginRPCPort2,
214+
Hostname: hostname,
215+
}
216+
217+
// Register the node with service registry
218+
err = objClient.RegisterService(srvInfo)
219+
if err != nil {
220+
log.Fatalf("Error registering service. Err: %v", err)
221+
return err
222+
}
223+
208224
// netplugn VTEP service info
209225
srvInfo = objdb.ServiceInfo{
210226
ServiceName: "netplugin.vtep",

test/systemtests/cfg.json

+1-29
Original file line numberDiff line numberDiff line change
@@ -1,29 +1 @@
1-
[
2-
{
3-
"scheduler" : "docker",
4-
"swarm_variable":"DOCKER_HOST=192.168.2.10:2375",
5-
"platform" : "vagrant",
6-
"product" : "netplugin",
7-
"aci_mode" : "off",
8-
"short" : false,
9-
"containers" : 3,
10-
"iterations" : 2,
11-
"enableDNS" : false,
12-
"contiv_cluster_store" : "etcd://localhost:2379",
13-
"contiv_l3" : "",
14-
"key_file" : "/home/admin/.ssh/id_rsa",
15-
"binpath" : "/opt/gopath/bin",
16-
17-
"hostips" : "192.168.2.10,192.168.2.11,192.168.2.12",
18-
"hostusernames" : "admin,admin,admin",
19-
"dataInterface" : "eth2",
20-
"mgmtInterface" : "eth1",
21-
"vlan" : "1120-1150",
22-
"vxlan" : "1-10000",
23-
"subnet" : "10.1.1.0/24",
24-
"gateway" : "10.1.1.254",
25-
"network" : "MyNet",
26-
"tenant": "MyTenant",
27-
"encap" : "vlan"
28-
}
29-
]
1+
{"binpath": "/opt/gopath/bin", "iterations": 3, "contiv_cluster_store": "etcd://localhost:2379", "gateway": "10.1.1.254", "subnet": "10.1.1.0/24", "network": "TestNet", "platform": "vagrant", "encap": "vlan", "vxlan": "1-10000", "product": "netplugin", "swarm_variable": "", "vlan": "1120-1150", "aci_mode": "off", "dataInterface": "eth2", "scheduler": "docker", "enableDNS": false, "tenant": "TestTenant", "short": false, "mgmtInterface": "eth1", "hostusernames": "admin,admin,admin", "hostips": "192.168.2.10,192.168.2.11,192.168.2.12", "containers": 3, "contiv_l3": 0, "key_file": "/home/admin/.ssh/id_rsa"}

test/systemtests/docker_test.go

+60-1
Original file line numberDiff line numberDiff line change
@@ -607,8 +607,59 @@ func (d *docker) waitForListeners() error {
607607
return d.node.runCommandWithTimeOut("netstat -tlpn | grep 9090 | grep LISTEN", 500*time.Millisecond, 50*time.Second)
608608
}
609609

610+
func (d *docker) verifyAgents(agentIPs map[string]bool) (string, error) {
611+
var data interface{}
612+
actAgents := make(map[string]uint32)
613+
614+
// read agent information from inspect
615+
cmd := "curl -s localhost:9999/debug/ofnet | python -mjson.tool"
616+
str, err := d.node.tbnode.RunCommandWithOutput(cmd)
617+
if err != nil {
618+
return "", err
619+
}
620+
621+
err = json.Unmarshal([]byte(str), &data)
622+
if err != nil {
623+
logrus.Errorf("Unmarshal error: %v", err)
624+
return str, err
625+
}
626+
627+
dd := data.(map[string]interface{})
628+
adb := dd["AgentDb"].(map[string]interface{})
629+
for key := range adb {
630+
actAgents[key] = 1
631+
}
632+
633+
// build expected agentRpc
634+
rpcSet := []string{":9002", ":9003"}
635+
expAgents := make(map[string]uint32)
636+
for agent := range agentIPs {
637+
for _, rpc := range rpcSet {
638+
k := agent + rpc
639+
expAgents[k] = 1
640+
}
641+
}
642+
643+
for agent := range expAgents {
644+
_, found := actAgents[agent]
645+
if !found {
646+
return str, errors.New("Agent " + agent + " not found")
647+
}
648+
}
649+
650+
// verify there are no extraneous Agents
651+
for agent := range actAgents {
652+
_, found := expAgents[agent]
653+
if !found {
654+
return str, errors.New("Unexpected Agent " + agent + " found on " + d.node.Name())
655+
}
656+
}
657+
658+
return "", nil
659+
}
610660
func (d *docker) verifyVTEPs(expVTEPS map[string]bool) (string, error) {
611661
var data interface{}
662+
localVtep := ""
612663
actVTEPs := make(map[string]uint32)
613664

614665
// read vtep information from inspect
@@ -648,7 +699,7 @@ func (d *docker) verifyVTEPs(expVTEPS map[string]bool) (string, error) {
648699
if found {
649700
switch l.(type) {
650701
case string:
651-
localVtep := l.(string)
702+
localVtep = l.(string)
652703
actVTEPs[localVtep] = 1
653704
}
654705
}
@@ -660,6 +711,14 @@ func (d *docker) verifyVTEPs(expVTEPS map[string]bool) (string, error) {
660711
}
661712
}
662713

714+
// verify there are no extraneous VTEPs
715+
for vtep := range actVTEPs {
716+
_, found := expVTEPS[vtep]
717+
if !found {
718+
return str, errors.New("Unexpected VTEP " + vtep + " found on " + localVtep)
719+
}
720+
}
721+
663722
return "", nil
664723
}
665724
func (d *docker) verifyEPs(epList []string) (string, error) {

test/systemtests/k8setup_test.go

+65-1
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,61 @@ func (k *kubernetes) waitForListeners() error {
623623
return k.node.runCommandWithTimeOut("netstat -tlpn | grep 9090 | grep LISTEN", 500*time.Millisecond, 50*time.Second)
624624
}
625625

626+
func (k *kubernetes) verifyAgents(agentIPs map[string]bool) (string, error) {
627+
if k.node.Name() != "k8master" {
628+
return "", nil
629+
}
630+
631+
var data interface{}
632+
actAgents := make(map[string]uint32)
633+
634+
// read vtep information from inspect
635+
cmd := "curl -s localhost:9999/debug/ofnet | python -mjson.tool"
636+
str, err := k.node.tbnode.RunCommandWithOutput(cmd)
637+
if err != nil {
638+
return "", err
639+
}
640+
641+
err = json.Unmarshal([]byte(str), &data)
642+
if err != nil {
643+
logrus.Errorf("Unmarshal error: %v", err)
644+
return str, err
645+
}
646+
647+
dd := data.(map[string]interface{})
648+
adb := dd["AgentDb"].(map[string]interface{})
649+
for key := range adb {
650+
actAgents[key] = 1
651+
}
652+
653+
// build expected agentRpc
654+
rpcSet := []string{":9002", ":9003"}
655+
expAgents := make(map[string]uint32)
656+
for agent := range agentIPs {
657+
for _, rpc := range rpcSet {
658+
k := agent + rpc
659+
expAgents[k] = 1
660+
}
661+
}
662+
663+
for agent := range expAgents {
664+
_, found := actAgents[agent]
665+
if !found {
666+
return str, errors.New("Agent " + agent + " not found")
667+
}
668+
}
669+
670+
// verify there are no extraneous Agents
671+
for agent := range actAgents {
672+
_, found := expAgents[agent]
673+
if !found {
674+
return str, errors.New("Unexpected Agent " + agent + " found on " + k.node.Name())
675+
}
676+
}
677+
678+
return "", nil
679+
}
680+
626681
func (k *kubernetes) verifyVTEPs(expVTEPS map[string]bool) (string, error) {
627682
var data interface{}
628683
actVTEPs := make(map[string]uint32)
@@ -662,11 +717,12 @@ func (k *kubernetes) verifyVTEPs(expVTEPS map[string]bool) (string, error) {
662717
}
663718

664719
// read local ip
720+
localVtep := ""
665721
l, found := vt["LocalIp"]
666722
if found {
667723
switch l.(type) {
668724
case string:
669-
localVtep := l.(string)
725+
localVtep = l.(string)
670726
actVTEPs[localVtep] = 1
671727
}
672728
}
@@ -678,6 +734,14 @@ func (k *kubernetes) verifyVTEPs(expVTEPS map[string]bool) (string, error) {
678734
}
679735
}
680736

737+
// verify there are no extraneous VTEPs
738+
for vtep := range actVTEPs {
739+
_, found := expVTEPS[vtep]
740+
if !found {
741+
return str, errors.New("Unexpected VTEP " + vtep + " found on " + localVtep)
742+
}
743+
}
744+
681745
return "", nil
682746
}
683747
func (k *kubernetes) verifyEPs(epList []string) (string, error) {

test/systemtests/node_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ func (n *node) waitForListeners() error {
196196
return n.exec.waitForListeners()
197197
}
198198

199+
func (n *node) verifyAgentDB(expAgents map[string]bool) (string, error) {
200+
return n.exec.verifyAgents(expAgents)
201+
}
202+
199203
func (n *node) verifyVTEPs(expVTEPS map[string]bool) (string, error) {
200204
return n.exec.verifyVTEPs(expVTEPS)
201205
}

test/systemtests/scheduler_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type systemTestScheduler interface {
3939
checkSchedulerNetworkCreated(nwName string, expectedOp bool) error
4040
waitForListeners() error
4141
verifyVTEPs(expVTEPS map[string]bool) (string, error)
42+
verifyAgents(expVTEPS map[string]bool) (string, error)
4243
verifyEPs(epList []string) (string, error)
4344
reloadNode(n *node) error
4445
getMasterIP() (string, error)

test/systemtests/swarm_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,58 @@ func (w *swarm) waitForListeners() error {
569569
return w.node.runCommandWithTimeOut("netstat -tlpn | grep 9090 | grep LISTEN", 500*time.Millisecond, 50*time.Second)
570570
}
571571

572+
func (w *swarm) verifyAgents(agentIPs map[string]bool) (string, error) {
573+
574+
var data interface{}
575+
actAgents := make(map[string]uint32)
576+
577+
// read vtep information from inspect
578+
cmd := "curl -s localhost:9999/debug/ofnet | python -mjson.tool"
579+
str, err := w.node.tbnode.RunCommandWithOutput(cmd)
580+
if err != nil {
581+
return "", err
582+
}
583+
584+
err = json.Unmarshal([]byte(str), &data)
585+
if err != nil {
586+
logrus.Errorf("Unmarshal error: %v", err)
587+
return str, err
588+
}
589+
590+
dd := data.(map[string]interface{})
591+
adb := dd["AgentDb"].(map[string]interface{})
592+
for key := range adb {
593+
actAgents[key] = 1
594+
}
595+
596+
// build expected agentRpc
597+
rpcSet := []string{":9002", ":9003"}
598+
expAgents := make(map[string]uint32)
599+
for agent := range agentIPs {
600+
for _, rpc := range rpcSet {
601+
k := agent + rpc
602+
expAgents[k] = 1
603+
}
604+
}
605+
606+
for agent := range expAgents {
607+
_, found := actAgents[agent]
608+
if !found {
609+
return str, errors.New("Agent " + agent + " not found")
610+
}
611+
}
612+
613+
// verify there are no extraneous Agents
614+
for agent := range actAgents {
615+
_, found := expAgents[agent]
616+
if !found {
617+
return str, errors.New("Unexpected Agent " + agent + " found on " + w.node.Name())
618+
}
619+
}
620+
621+
return "", nil
622+
}
623+
572624
func (w *swarm) verifyVTEPs(expVTEPS map[string]bool) (string, error) {
573625
var data interface{}
574626
actVTEPs := make(map[string]uint32)

0 commit comments

Comments
 (0)