@@ -128,31 +128,42 @@ func (d *MasterDaemon) registerService() {
128
128
log .Infof ("Registered netmaster service with registry" )
129
129
}
130
130
131
- // Find all netplugin nodes and register them
132
- func (d * MasterDaemon ) registerNetpluginNodes () error {
133
- // Get all netplugin services
134
- srvList , err := master .ObjdbClient .GetService ("netplugin" )
131
+ // Find all netplugin nodes and add them to ofnet master
132
+ func (d * MasterDaemon ) agentDiscoveryLoop () {
133
+
134
+ // Create channels for watch thread
135
+ agentEventCh := make (chan objdb.WatchServiceEvent , 1 )
136
+ watchStopCh := make (chan bool , 1 )
137
+
138
+ // Start a watch on netplugin service
139
+ err := master .ObjdbClient .WatchService ("netplugin" , agentEventCh , watchStopCh )
135
140
if err != nil {
136
- log .Errorf ("Error getting netplugin nodes. Err: %v" , err )
137
- return err
141
+ log .Fatalf ("Could not start a watch on netplugin service. Err: %v" , err )
138
142
}
139
143
140
- // Add each node
141
- for _ , srv := range srvList {
144
+ for {
145
+ agentEv := <- agentEventCh
146
+ log .Debugf ("Received netplugin watch event: %+v" , agentEv )
142
147
// build host info
143
148
nodeInfo := ofnet.OfnetNode {
144
- HostAddr : srv .HostAddr ,
145
- HostPort : uint16 (srv .Port ),
149
+ HostAddr : agentEv . ServiceInfo .HostAddr ,
150
+ HostPort : uint16 (agentEv . ServiceInfo .Port ),
146
151
}
147
152
148
- // Add the node
149
- err = d .ofnetMaster .AddNode (nodeInfo )
150
- if err != nil {
151
- log .Errorf ("Error adding node %v. Err: %v" , srv , err )
153
+ if agentEv .EventType == objdb .WatchServiceEventAdd {
154
+ err = d .ofnetMaster .AddNode (nodeInfo )
155
+ if err != nil {
156
+ log .Errorf ("Error adding node %v. Err: %v" , nodeInfo , err )
157
+ }
158
+ } else if agentEv .EventType == objdb .WatchServiceEventDel {
159
+ var res bool
160
+ log .Infof ("Unregister node %+v" , nodeInfo )
161
+ d .ofnetMaster .UnRegisterNode (& nodeInfo , & res )
152
162
}
153
- }
154
163
155
- return nil
164
+ // Dont process next peer event for another 100ms
165
+ time .Sleep (100 * time .Millisecond )
166
+ }
156
167
}
157
168
158
169
// registerRoutes registers HTTP route handlers
@@ -338,7 +349,7 @@ func (d *MasterDaemon) RunMasterFsm() {
338
349
}
339
350
340
351
// Register all existing netplugins in the background
341
- go d .registerNetpluginNodes ()
352
+ go d .agentDiscoveryLoop ()
342
353
343
354
// Create the lock
344
355
leaderLock , err = master .ObjdbClient .NewLock ("netmaster/leader" , localIP , leaderLockTTL )
0 commit comments