Skip to content

Commit d36b6be

Browse files
author
Joji Mekkatt
committed
Use EtcdIndex as reference for watch
1 parent 2afee8d commit d36b6be

File tree

1 file changed

+34
-16
lines changed

1 file changed

+34
-16
lines changed

etcdService.go

+34-16
Original file line numberDiff line numberDiff line change
@@ -61,25 +61,31 @@ func (self *etcdPlugin) RegisterService(serviceInfo ServiceInfo) error {
6161
return nil
6262
}
6363

64-
// List all end points for a service
64+
// GetService lists all end points for a service
6565
func (self *etcdPlugin) GetService(name string) ([]ServiceInfo, error) {
6666
keyName := "/contiv.io/service/" + name + "/"
6767

68+
_, srvcList, err := self.getServiceState(keyName)
69+
return srvcList, err
70+
}
71+
72+
func (self *etcdPlugin) getServiceState(key string) (uint64, []ServiceInfo, error) {
73+
6874
// Get the object from etcd client
69-
resp, err := self.client.Get(keyName, true, true)
75+
resp, err := self.client.Get(key, true, true)
7076
if err != nil {
7177
if strings.Contains(err.Error(), "Key not found") {
72-
return nil, nil
78+
return 0, nil, nil
7379
} else {
74-
log.Errorf("Error getting key %s. Err: %v", keyName, err)
75-
return nil, err
80+
log.Errorf("Error getting key %s. Err: %v", key, err)
81+
return 0, nil, err
7682
}
7783

7884
}
7985

8086
if !resp.Node.Dir {
8187
log.Errorf("Err. Response is not a directory: %+v", resp.Node)
82-
return nil, errors.New("Invalid Response from etcd")
88+
return 0, nil, errors.New("Invalid Response from etcd")
8389
}
8490

8591
srvcList := make([]ServiceInfo, 0)
@@ -91,23 +97,35 @@ func (self *etcdPlugin) GetService(name string) ([]ServiceInfo, error) {
9197
err = json.Unmarshal([]byte(node.Value), &respSrvc)
9298
if err != nil {
9399
log.Errorf("Error parsing object %s, Err %v", node.Value, err)
94-
return nil, err
100+
return 0, nil, err
95101
}
96102

97103
srvcList = append(srvcList, respSrvc)
98104
}
99105

100-
return srvcList, nil
106+
watchIndex := resp.EtcdIndex + 1
107+
return watchIndex, srvcList, nil
101108
}
102109

103-
func (self *etcdPlugin) getCurrentIndex(key string) (uint64, error) {
104-
// Get the object from etcd client
105-
resp, err := self.client.Get(key, true, false)
110+
// initServiceState reads the current state and injects it to the channel
111+
// additionally, it returns the next index to watch
112+
func (self *etcdPlugin) initServiceState(key string, eventCh chan WatchServiceEvent) (uint64, error) {
113+
mIndex, srvcList, err := self.getServiceState(key)
106114
if err != nil {
107-
return 0, err
115+
return mIndex, err
108116
}
109117

110-
return resp.Node.ModifiedIndex, nil
118+
// walk each service and inject it as an add event
119+
for _, srvInfo := range srvcList {
120+
log.Infof("Sending service add event: %+v", srvInfo)
121+
// Send Add event
122+
eventCh <- WatchServiceEvent{
123+
EventType: WatchServiceEventAdd,
124+
ServiceInfo: srvInfo,
125+
}
126+
}
127+
128+
return mIndex, nil
111129
}
112130

113131
// Watch for a service
@@ -121,9 +139,9 @@ func (self *etcdPlugin) WatchService(name string,
121139

122140
// Start the watch thread
123141
go func() {
124-
// Watch from current index to force a read of the initial state
125-
watchIndex, err := self.getCurrentIndex(keyName)
126-
if (err != nil) {
142+
// Get current state and etcd index to watch
143+
watchIndex, err := self.initServiceState(keyName, eventCh)
144+
if err != nil {
127145
log.Fatalf("Unable to watch service key: %s - %v", keyName,
128146
err)
129147
}

0 commit comments

Comments
 (0)