@@ -61,25 +61,31 @@ func (self *etcdPlugin) RegisterService(serviceInfo ServiceInfo) error {
61
61
return nil
62
62
}
63
63
64
- // List all end points for a service
64
+ // GetService lists all end points for a service
65
65
func (self * etcdPlugin ) GetService (name string ) ([]ServiceInfo , error ) {
66
66
keyName := "/contiv.io/service/" + name + "/"
67
67
68
+ _ , srvcList , err := self .getServiceState (keyName )
69
+ return srvcList , err
70
+ }
71
+
72
+ func (self * etcdPlugin ) getServiceState (key string ) (uint64 , []ServiceInfo , error ) {
73
+
68
74
// Get the object from etcd client
69
- resp , err := self .client .Get (keyName , true , true )
75
+ resp , err := self .client .Get (key , true , true )
70
76
if err != nil {
71
77
if strings .Contains (err .Error (), "Key not found" ) {
72
- return nil , nil
78
+ return 0 , nil , nil
73
79
} else {
74
- log .Errorf ("Error getting key %s. Err: %v" , keyName , err )
75
- return nil , err
80
+ log .Errorf ("Error getting key %s. Err: %v" , key , err )
81
+ return 0 , nil , err
76
82
}
77
83
78
84
}
79
85
80
86
if ! resp .Node .Dir {
81
87
log .Errorf ("Err. Response is not a directory: %+v" , resp .Node )
82
- return nil , errors .New ("Invalid Response from etcd" )
88
+ return 0 , nil , errors .New ("Invalid Response from etcd" )
83
89
}
84
90
85
91
srvcList := make ([]ServiceInfo , 0 )
@@ -91,13 +97,35 @@ func (self *etcdPlugin) GetService(name string) ([]ServiceInfo, error) {
91
97
err = json .Unmarshal ([]byte (node .Value ), & respSrvc )
92
98
if err != nil {
93
99
log .Errorf ("Error parsing object %s, Err %v" , node .Value , err )
94
- return nil , err
100
+ return 0 , nil , err
95
101
}
96
102
97
103
srvcList = append (srvcList , respSrvc )
98
104
}
99
105
100
- return srvcList , nil
106
+ watchIndex := resp .EtcdIndex + 1
107
+ return watchIndex , srvcList , nil
108
+ }
109
+
110
+ // initServiceState reads the current state and injects it to the channel
111
+ // additionally, it returns the next index to watch
112
+ func (self * etcdPlugin ) initServiceState (key string , eventCh chan WatchServiceEvent ) (uint64 , error ) {
113
+ mIndex , srvcList , err := self .getServiceState (key )
114
+ if err != nil {
115
+ return mIndex , err
116
+ }
117
+
118
+ // walk each service and inject it as an add event
119
+ for _ , srvInfo := range srvcList {
120
+ log .Infof ("Sending service add event: %+v" , srvInfo )
121
+ // Send Add event
122
+ eventCh <- WatchServiceEvent {
123
+ EventType : WatchServiceEventAdd ,
124
+ ServiceInfo : srvInfo ,
125
+ }
126
+ }
127
+
128
+ return mIndex , nil
101
129
}
102
130
103
131
// Watch for a service
@@ -111,9 +139,16 @@ func (self *etcdPlugin) WatchService(name string,
111
139
112
140
// Start the watch thread
113
141
go func () {
114
- log .Infof ("Watching for service: %s" , keyName )
142
+ // Get current state and etcd index to watch
143
+ watchIndex , err := self .initServiceState (keyName , eventCh )
144
+ if err != nil {
145
+ log .Fatalf ("Unable to watch service key: %s - %v" , keyName ,
146
+ err )
147
+ }
148
+
149
+ log .Infof ("Watching for service: %s at index %v" , keyName , watchIndex )
115
150
// Start the watch
116
- _ , err : = self .client .Watch (keyName , 0 , true , watchCh , watchStopCh )
151
+ _ , err = self .client .Watch (keyName , watchIndex , true , watchCh , watchStopCh )
117
152
if (err != nil ) && (err != etcd .ErrWatchStoppedByUser ) {
118
153
log .Errorf ("Error watching service %s. Err: %v" , keyName , err )
119
154
0 commit comments