Skip to content

Commit b9df55d

Browse files
authored
fix consul locking (#19)
fix consul services fix network partitioning issues
1 parent 75ae6c7 commit b9df55d

File tree

3 files changed

+103
-40
lines changed

3 files changed

+103
-40
lines changed

consulLock.go

+70-37
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ type consulLock struct {
3232
isAcquired bool
3333
isReleased bool
3434
ttl string
35-
timeout uint64
3635
sessionID string
3736
eventChan chan LockEvent
3837
stopChan chan struct{}
@@ -57,31 +56,15 @@ func (cp *ConsulClient) NewLock(name string, myID string, ttl uint64) (LockInter
5756

5857
// Acquire a lock
5958
func (lk *consulLock) Acquire(timeout uint64) error {
60-
// session configuration
61-
sessCfg := api.SessionEntry{
62-
Name: lk.keyName,
63-
Behavior: "release",
64-
LockDelay: 10 * time.Millisecond,
65-
TTL: lk.ttl,
66-
}
67-
6859
// Create consul session
69-
sessionID, _, err := lk.client.Session().CreateNoChecks(&sessCfg, nil)
60+
err := lk.createSession()
7061
if err != nil {
7162
log.Errorf("Error Creating session for lock %s. Err: %v", lk.keyName, err)
7263
return err
7364
}
7465

75-
log.Infof("Created session: %s for lock %s/%s", sessionID, lk.name, lk.myID)
76-
77-
// save the session ID for later
78-
lk.mutex.Lock()
79-
lk.timeout = timeout
80-
lk.sessionID = sessionID
81-
lk.mutex.Unlock()
82-
8366
// Refresh the session in background
84-
go lk.client.Session().RenewPeriodic(lk.ttl, sessionID, nil, lk.stopChan)
67+
go lk.renewSession()
8568

8669
// Watch for changes on the lock
8770
go lk.acquireLock()
@@ -105,18 +88,18 @@ func (lk *consulLock) Acquire(timeout uint64) error {
10588

10689
// Release a lock
10790
func (lk *consulLock) Release() error {
108-
lk.mutex.Lock()
109-
defer lk.mutex.Unlock()
11091

11192
// Mark this as released
93+
lk.mutex.Lock()
11294
lk.isReleased = true
95+
lk.mutex.Unlock()
11396

11497
// Send stop signal on stop channel
11598
close(lk.stopChan)
11699

117100
// If the lock was acquired, release it
118-
if lk.isAcquired {
119-
lk.isAcquired = false
101+
if lk.IsAcquired() {
102+
lk.setAcquired(false)
120103

121104
// Release it via consul client
122105
succ, _, err := lk.client.KV().Release(&api.KVPair{Key: lk.keyName, Value: []byte(lk.myID), Session: lk.sessionID}, nil)
@@ -164,6 +147,13 @@ func (lk *consulLock) IsAcquired() bool {
164147
return lk.isAcquired
165148
}
166149

150+
// IsReleased Checks if the lock is released
151+
func (lk *consulLock) IsReleased() bool {
152+
lk.mutex.Lock()
153+
defer lk.mutex.Unlock()
154+
return lk.isReleased
155+
}
156+
167157
// GetHolder Gets current lock holder's ID
168158
func (lk *consulLock) GetHolder() string {
169159
lk.mutex.Lock()
@@ -206,25 +196,18 @@ func (lk *consulLock) acquireLock() {
206196

207197
log.Debugf("Got lock(%s) watch Resp: %+v", lk.myID, resp)
208198

209-
// check if we are holding the lock
210-
lk.mutex.Lock()
211199
// exit the loop if lock is released
212-
if lk.isReleased {
200+
if lk.IsReleased() {
213201
log.Infof("Lock is released. exiting watch")
214-
lk.mutex.Unlock()
215202
return
216203
}
217204

218-
isAcquired := lk.isAcquired
219-
lk.mutex.Unlock()
220-
221-
if isAcquired {
205+
// check if we are holding the lock
206+
if lk.IsAcquired() {
222207
// check if we lost the lock
223208
if resp == nil || resp.Session != lk.sessionID || string(resp.Value) != lk.myID {
224209
// lock is released
225-
lk.mutex.Lock()
226-
lk.isAcquired = false
227-
lk.mutex.Unlock()
210+
lk.setAcquired(false)
228211

229212
log.Infof("Lost lock %s", lk.name, lk.myID)
230213

@@ -248,9 +231,7 @@ func (lk *consulLock) acquireLock() {
248231
log.Infof("Acquired lock %s/%s", lk.name, lk.myID)
249232

250233
// Mark the lock as acquired
251-
lk.mutex.Lock()
252-
lk.isAcquired = true
253-
lk.mutex.Unlock()
234+
lk.setAcquired(true)
254235

255236
// Send acquired message to event channel
256237
lk.eventChan <- LockEvent{EventType: LockAcquired}
@@ -263,3 +244,55 @@ func (lk *consulLock) acquireLock() {
263244
waitIdx = meta.LastIndex
264245
}
265246
}
247+
248+
// setAcquired marks the lock as acquired/not
249+
func (lk *consulLock) setAcquired(isAcquired bool) {
250+
lk.mutex.Lock()
251+
lk.isAcquired = isAcquired
252+
lk.mutex.Unlock()
253+
}
254+
255+
// createSession creates a consul-session for the lock
256+
func (lk *consulLock) createSession() error {
257+
// session configuration
258+
sessCfg := api.SessionEntry{
259+
Name: lk.keyName,
260+
Behavior: "delete",
261+
LockDelay: 10 * time.Millisecond,
262+
TTL: lk.ttl,
263+
}
264+
265+
// Create consul session
266+
sessionID, _, err := lk.client.Session().CreateNoChecks(&sessCfg, nil)
267+
if err != nil {
268+
log.Errorf("Error Creating session for lock %s. Err: %v", lk.keyName, err)
269+
return err
270+
}
271+
272+
log.Infof("Created session: %s for lock %s/%s", sessionID, lk.name, lk.myID)
273+
274+
// save the session ID for later
275+
lk.mutex.Lock()
276+
lk.sessionID = sessionID
277+
lk.mutex.Unlock()
278+
279+
return nil
280+
}
281+
282+
// renewSession keeps the session alive.. If a session expires, it creates new one..
283+
func (lk *consulLock) renewSession() {
284+
for {
285+
err := lk.client.Session().RenewPeriodic(lk.ttl, lk.sessionID, nil, lk.stopChan)
286+
if err == nil || lk.IsReleased() {
287+
// If lock was released, exit this go routine
288+
return
289+
}
290+
291+
// Create new consul session
292+
err = lk.createSession()
293+
if err != nil {
294+
log.Errorf("Error Creating session for lock %s. Err: %v", lk.keyName, err)
295+
}
296+
}
297+
298+
}

consulService.go

+31-2
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func (cp *ConsulClient) RegisterService(serviceInfo ServiceInfo) error {
117117

118118
// Run refresh in background
119119
stopChan := make(chan struct{})
120-
go cp.renewService(keyName, sessCfg.TTL, sessionID, stopChan)
120+
go cp.renewService(keyName, sessCfg.TTL, sessionID, jsonVal, stopChan)
121121

122122
// Store it in DB
123123
cp.serviceDb[keyName] = &consulServiceState{
@@ -259,14 +259,43 @@ func (cp *ConsulClient) DeregisterService(serviceInfo ServiceInfo) error {
259259
}
260260

261261
//--------------------- Internal funcitons -------------------
262-
func (cp *ConsulClient) renewService(keyName, ttl, sessionID string, stopChan chan struct{}) {
262+
func (cp *ConsulClient) renewService(keyName, ttl, sessionID string, jsonVal []byte, stopChan chan struct{}) {
263263
for {
264264
err := cp.client.Session().RenewPeriodic(ttl, sessionID, nil, stopChan)
265265
if err == nil {
266266
log.Infof("Stoping renew on %s", keyName)
267267
return
268268
}
269269
log.Infof("RenewPeriodic for session %s exited with error: %v. Retrying..", keyName, err)
270+
271+
// session configuration
272+
sessCfg := api.SessionEntry{
273+
Name: keyName,
274+
Behavior: "delete",
275+
LockDelay: 10 * time.Millisecond,
276+
TTL: ttl,
277+
}
278+
279+
// Create consul session
280+
sessionID, _, err = cp.client.Session().CreateNoChecks(&sessCfg, nil)
281+
if err != nil {
282+
log.Errorf("Error Creating session for lock %s. Err: %v", keyName, err)
283+
}
284+
285+
// Delete the old key if it exists..
286+
log.Infof("Deleting old service entry for key %s", keyName)
287+
_, err = cp.client.KV().Delete(keyName, nil)
288+
if err != nil {
289+
log.Errorf("Error deleting key %s. Err: %v", keyName, err)
290+
}
291+
292+
// Set it via consul client
293+
succ, _, err := cp.client.KV().Acquire(&api.KVPair{Key: keyName, Value: jsonVal, Session: sessionID}, nil)
294+
if err != nil {
295+
log.Errorf("Error setting key %s, Err: %v", keyName, err)
296+
} else if !succ {
297+
log.Errorf("Failed to acquire key %s. Already acquired", keyName)
298+
}
270299
}
271300
}
272301

etcdLock.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,13 @@ func (lk *etcdLock) acquireLock() {
147147
for {
148148
log.Infof("Getting the lock %s to see if its acquired", keyName)
149149
// Get the key and see if we or someone else has already acquired the lock
150-
resp, err := lk.kapi.Get(context.Background(), keyName, nil)
150+
resp, err := lk.kapi.Get(context.Background(), keyName, &client.GetOptions{Quorum: true})
151151
if err != nil {
152152
if !client.IsKeyNotFound(err) {
153153
log.Errorf("Error getting the key %s. Err: %v", keyName, err)
154154
// Retry after a second in case of error
155155
time.Sleep(time.Second)
156+
continue
156157
} else {
157158
log.Infof("Lock %s does not exist. trying to acquire it", keyName)
158159
}

0 commit comments

Comments
 (0)