Skip to content

Commit b1791c2

Browse files
authored
Handle etcd/consul restarts (#14)
* Handle etcd/consul restart * Retry etcd/consul ops
1 parent e2e99ef commit b1791c2

6 files changed

+195
-99
lines changed

client_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,9 @@ func testServiceRegisterDeregister(t *testing.T, dbClient API) {
554554
}
555555
log.Infof("Registered service: %+v", service2Info)
556556

557+
// Wait for a second for registration to happen in background
558+
time.Sleep(time.Second)
559+
557560
resp, err := dbClient.GetService("athena")
558561
if err != nil {
559562
t.Fatalf("Fatal getting service. Err: %+v\n", err)

consulClient.go

+67-8
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"strings"
2222
"sync"
23+
"time"
2324

2425
"github.com/hashicorp/consul/api"
2526

@@ -39,6 +40,9 @@ type ConsulClient struct {
3940
serviceDb map[string]*consulServiceState
4041
}
4142

43+
// Max times to retry
44+
const maxConsulRetries = 10
45+
4246
// init Register the plugin
4347
func init() {
4448
RegisterPlugin("consul", &consulPlugin{mutex: new(sync.Mutex)})
@@ -88,7 +92,23 @@ func (cp *ConsulClient) GetObj(key string, retVal interface{}) error {
8892

8993
resp, _, err := cp.client.KV().Get(key, &api.QueryOptions{RequireConsistent: true})
9094
if err != nil {
91-
return err
95+
if api.IsServerError(err) || strings.Contains(err.Error(), "EOF") ||
96+
strings.Contains(err.Error(), "connection refused") {
97+
for i := 0; i < maxConsulRetries; i++ {
98+
resp, _, err = cp.client.KV().Get(key, &api.QueryOptions{RequireConsistent: true})
99+
if err == nil {
100+
break
101+
}
102+
103+
// Retry after a delay
104+
time.Sleep(time.Second)
105+
}
106+
}
107+
108+
// return error if it failed after retries
109+
if err != nil {
110+
return err
111+
}
92112
}
93113
// Consul returns success and a nil kv when a key is not found,
94114
// translate it to 'Key not found' error
@@ -111,7 +131,23 @@ func (cp *ConsulClient) ListDir(key string) ([]string, error) {
111131

112132
kvs, _, err := cp.client.KV().List(key, nil)
113133
if err != nil {
114-
return nil, err
134+
if api.IsServerError(err) || strings.Contains(err.Error(), "EOF") ||
135+
strings.Contains(err.Error(), "connection refused") {
136+
for i := 0; i < maxConsulRetries; i++ {
137+
kvs, _, err = cp.client.KV().List(key, nil)
138+
if err == nil {
139+
break
140+
}
141+
142+
// Retry after a delay
143+
time.Sleep(time.Second)
144+
}
145+
}
146+
147+
// return error if it failed after retries
148+
if err != nil {
149+
return nil, err
150+
}
115151
}
116152
// Consul returns success and a nil kv when a key is not found,
117153
// translate it to 'Key not found' error
@@ -139,6 +175,20 @@ func (cp *ConsulClient) SetObj(key string, value interface{}) error {
139175
}
140176

141177
_, err = cp.client.KV().Put(&api.KVPair{Key: key, Value: jsonVal}, nil)
178+
if err != nil {
179+
if api.IsServerError(err) || strings.Contains(err.Error(), "EOF") ||
180+
strings.Contains(err.Error(), "connection refused") {
181+
for i := 0; i < maxConsulRetries; i++ {
182+
_, err = cp.client.KV().Put(&api.KVPair{Key: key, Value: jsonVal}, nil)
183+
if err == nil {
184+
break
185+
}
186+
187+
// Retry after a delay
188+
time.Sleep(time.Second)
189+
}
190+
}
191+
}
142192

143193
return err
144194
}
@@ -147,11 +197,20 @@ func (cp *ConsulClient) SetObj(key string, value interface{}) error {
147197
func (cp *ConsulClient) DelObj(key string) error {
148198
key = processKey("/contiv.io/obj/" + processKey(key))
149199
_, err := cp.client.KV().Delete(key, nil)
150-
return err
151-
}
200+
if err != nil {
201+
if api.IsServerError(err) || strings.Contains(err.Error(), "EOF") ||
202+
strings.Contains(err.Error(), "connection refused") {
203+
for i := 0; i < maxConsulRetries; i++ {
204+
_, err = cp.client.KV().Delete(key, nil)
205+
if err == nil {
206+
break
207+
}
208+
209+
// Retry after a delay
210+
time.Sleep(time.Second)
211+
}
212+
}
213+
}
152214

153-
// GetLocalAddr gets local address of the host
154-
func (cp *ConsulClient) GetLocalAddr() (string, error) {
155-
log.Panic("Calling unsupported API")
156-
return "", nil
215+
return err
157216
}

consulService.go

+44-35
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"strconv"
23+
"strings"
2324
"time"
2425

2526
log "github.com/Sirupsen/logrus"
@@ -171,48 +172,56 @@ func (cp *ConsulClient) WatchService(srvName string, eventCh chan WatchServiceEv
171172
case <-stopCh:
172173
return
173174
default:
174-
}
175-
176-
srvList, lastIdx, err = cp.getServiceInstances(keyName, lastIdx)
177-
if err != nil {
178-
log.Errorf("Error getting service instances for (%s): Err: %v", srvName, err)
179-
} else {
180-
log.Debugf("Got consul srv list: {%+v}. Curr: {%+v}", srvList, currSrvMap)
181-
var newSrvMap = make(map[string]ServiceInfo)
182-
183-
// Check if there are any new services
184-
for _, srvInfo := range srvList {
185-
srvKey := srvInfo.HostAddr + ":" + strconv.Itoa(srvInfo.Port)
186-
187-
// If the entry didnt exists previously, trigger add event
188-
if _, ok := currSrvMap[srvKey]; !ok {
189-
log.Debugf("Sending add event for srv: %v", srvInfo)
190-
eventCh <- WatchServiceEvent{
191-
EventType: WatchServiceEventAdd,
192-
ServiceInfo: srvInfo,
193-
}
175+
// Read the service instances
176+
srvList, lastIdx, err = cp.getServiceInstances(keyName, lastIdx)
177+
if err != nil {
178+
if api.IsServerError(err) || strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "connection refused") {
179+
log.Warnf("Consul service watch: server error: %v Retrying..", err)
180+
} else {
181+
log.Errorf("Error getting service instances for (%s): Err: %v. Exiting watch", srvName, err)
194182
}
195183

196-
// create new service map
197-
newSrvMap[srvKey] = srvInfo
198-
}
184+
// Wait a little and continue
185+
time.Sleep(5 * time.Second)
186+
continue
187+
} else {
188+
log.Debugf("Got consul srv list: {%+v}. Curr: {%+v}", srvList, currSrvMap)
189+
var newSrvMap = make(map[string]ServiceInfo)
190+
191+
// Check if there are any new services
192+
for _, srvInfo := range srvList {
193+
srvKey := srvInfo.HostAddr + ":" + strconv.Itoa(srvInfo.Port)
194+
195+
// If the entry didnt exists previously, trigger add event
196+
if _, ok := currSrvMap[srvKey]; !ok {
197+
log.Debugf("Sending add event for srv: %v", srvInfo)
198+
eventCh <- WatchServiceEvent{
199+
EventType: WatchServiceEventAdd,
200+
ServiceInfo: srvInfo,
201+
}
202+
}
199203

200-
// for all entries in old service list, see if we need to delete any
201-
for _, srvInfo := range currSrvMap {
202-
srvKey := srvInfo.HostAddr + ":" + strconv.Itoa(srvInfo.Port)
204+
// create new service map
205+
newSrvMap[srvKey] = srvInfo
206+
}
203207

204-
// if the entry does not exists in new list, delete it
205-
if _, ok := newSrvMap[srvKey]; !ok {
206-
log.Debugf("Sending delete event for srv: %v", srvInfo)
207-
eventCh <- WatchServiceEvent{
208-
EventType: WatchServiceEventDel,
209-
ServiceInfo: srvInfo,
208+
// for all entries in old service list, see if we need to delete any
209+
for _, srvInfo := range currSrvMap {
210+
srvKey := srvInfo.HostAddr + ":" + strconv.Itoa(srvInfo.Port)
211+
212+
// if the entry does not exists in new list, delete it
213+
if _, ok := newSrvMap[srvKey]; !ok {
214+
log.Debugf("Sending delete event for srv: %v", srvInfo)
215+
eventCh <- WatchServiceEvent{
216+
EventType: WatchServiceEventDel,
217+
ServiceInfo: srvInfo,
218+
}
210219
}
211220
}
212-
}
213221

214-
// set new srv map as the current
215-
currSrvMap = newSrvMap
222+
// set new srv map as the current
223+
currSrvMap = newSrvMap
224+
}
216225
}
217226
}
218227
}()

etcdClient.go

+71-48
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import (
2020
"errors"
2121
"io/ioutil"
2222
"net/http"
23-
"strings"
2423
"sync"
24+
"time"
2525

2626
"golang.org/x/net/context"
2727

@@ -50,6 +50,9 @@ type memData struct {
5050
Members []member `json:"members"`
5151
}
5252

53+
// Max retry count
54+
const maxEtcdRetries = 10
55+
5356
// Register the plugin
5457
func init() {
5558
RegisterPlugin("etcd", &etcdPlugin{mutex: new(sync.Mutex)})
@@ -102,8 +105,22 @@ func (ep *EtcdClient) GetObj(key string, retVal interface{}) error {
102105
// Get the object from etcd client
103106
resp, err := ep.kapi.Get(context.Background(), keyName, &client.GetOptions{Quorum: true})
104107
if err != nil {
105-
log.Errorf("Error getting key %s. Err: %v", keyName, err)
106-
return err
108+
// Retry few times if cluster is unavailable
109+
if err.Error() == client.ErrClusterUnavailable.Error() {
110+
for i := 0; i < maxEtcdRetries; i++ {
111+
resp, err = ep.kapi.Get(context.Background(), keyName, &client.GetOptions{Quorum: true})
112+
if err == nil {
113+
break
114+
}
115+
116+
// Retry after a delay
117+
time.Sleep(time.Second)
118+
}
119+
}
120+
if err != nil {
121+
log.Errorf("Error getting key %s. Err: %v", keyName, err)
122+
return err
123+
}
107124
}
108125

109126
// Parse JSON response
@@ -142,7 +159,21 @@ func (ep *EtcdClient) ListDir(key string) ([]string, error) {
142159
// Get the object from etcd client
143160
resp, err := ep.kapi.Get(context.Background(), keyName, &getOpts)
144161
if err != nil {
145-
return nil, nil
162+
// Retry few times if cluster is unavailable
163+
if err.Error() == client.ErrClusterUnavailable.Error() {
164+
for i := 0; i < maxEtcdRetries; i++ {
165+
resp, err = ep.kapi.Get(context.Background(), keyName, &getOpts)
166+
if err == nil {
167+
break
168+
}
169+
170+
// Retry after a delay
171+
time.Sleep(time.Second)
172+
}
173+
}
174+
if err != nil {
175+
return nil, nil
176+
}
146177
}
147178

148179
if !resp.Node.Dir {
@@ -172,9 +203,24 @@ func (ep *EtcdClient) SetObj(key string, value interface{}) error {
172203
}
173204

174205
// Set it via etcd client
175-
if _, err := ep.kapi.Set(context.Background(), keyName, string(jsonVal[:]), nil); err != nil {
176-
log.Errorf("Error setting key %s, Err: %v", keyName, err)
177-
return err
206+
_, err = ep.kapi.Set(context.Background(), keyName, string(jsonVal[:]), nil)
207+
if err != nil {
208+
// Retry few times if cluster is unavailable
209+
if err.Error() == client.ErrClusterUnavailable.Error() {
210+
for i := 0; i < maxEtcdRetries; i++ {
211+
_, err = ep.kapi.Set(context.Background(), keyName, string(jsonVal[:]), nil)
212+
if err == nil {
213+
break
214+
}
215+
216+
// Retry after a delay
217+
time.Sleep(time.Second)
218+
}
219+
}
220+
if err != nil {
221+
log.Errorf("Error setting key %s, Err: %v", keyName, err)
222+
return err
223+
}
178224
}
179225

180226
return nil
@@ -185,9 +231,24 @@ func (ep *EtcdClient) DelObj(key string) error {
185231
keyName := "/contiv.io/obj/" + key
186232

187233
// Remove it via etcd client
188-
if _, err := ep.kapi.Delete(context.Background(), keyName, nil); err != nil {
189-
log.Errorf("Error removing key %s, Err: %v", keyName, err)
190-
return err
234+
_, err := ep.kapi.Delete(context.Background(), keyName, nil)
235+
if err != nil {
236+
// Retry few times if cluster is unavailable
237+
if err.Error() == client.ErrClusterUnavailable.Error() {
238+
for i := 0; i < maxEtcdRetries; i++ {
239+
_, err = ep.kapi.Delete(context.Background(), keyName, nil)
240+
if err == nil {
241+
break
242+
}
243+
244+
// Retry after a delay
245+
time.Sleep(time.Second)
246+
}
247+
}
248+
if err != nil {
249+
log.Errorf("Error removing key %s, Err: %v", keyName, err)
250+
return err
251+
}
191252
}
192253

193254
return nil
@@ -215,41 +276,3 @@ func httpGetJSON(url string, data interface{}) (interface{}, error) {
215276

216277
return data, nil
217278
}
218-
219-
// GetLocalAddr Return the local address where etcd is listening
220-
func (ep *EtcdClient) GetLocalAddr() (string, error) {
221-
var epData struct {
222-
Name string `json:"name"`
223-
}
224-
225-
log.Panic("Calling unsupported API")
226-
227-
// Get ep state from etcd
228-
if _, err := httpGetJSON("http://localhost:2379/v2/stats/self", &epData); err != nil {
229-
log.Errorf("Error getting self state. Err: %v", err)
230-
return "", errors.New("Error getting self state")
231-
}
232-
233-
var memData memData
234-
235-
// Get member list from etcd
236-
if _, err := httpGetJSON("http://localhost:2379/v2/members", &memData); err != nil {
237-
log.Errorf("Error getting members state. Err: %v", err)
238-
return "", errors.New("Error getting members state")
239-
}
240-
241-
myName := epData.Name
242-
members := memData.Members
243-
244-
for _, mem := range members {
245-
if mem.Name == myName {
246-
for _, clientURL := range mem.ClientURLs {
247-
hostStr := strings.TrimPrefix(clientURL, "http://")
248-
hostAddr := strings.Split(hostStr, ":")[0]
249-
log.Infof("Got host addr: %s", hostAddr)
250-
return hostAddr, nil
251-
}
252-
}
253-
}
254-
return "", errors.New("Address not found")
255-
}

0 commit comments

Comments
 (0)