Skip to content

Make Raft.LeaderCh() return a new channel for each invocation #427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ type Raft struct {
// leaderCh is used to notify of leadership changes
leaderCh chan bool

leaderChs []chan bool
leaderChsLock sync.Mutex
// leaderChLastMessage is the last message sent over leaderCh that has been processed.
// It is sent to new channels created by LeaderCh().
leaderChLastMessage bool

// leaderState used only while state is leader
leaderState leaderState

Expand Down Expand Up @@ -957,14 +963,37 @@ func (r *Raft) State() RaftState {
// lose it.
//
// Receivers can expect to receive a notification only if leadership
// transition has occured.
// transition has occured and immediately after LeaderCh() returns with the
// current state.
//
// If receivers aren't ready for the signal, signals may drop and only the
// latest leadership transition. For example, if a receiver receives subsequent
// `true` values, they may deduce that leadership was lost and regained while
// the the receiver was processing first leadership transition.
// the receiver was processing first leadership transition.
func (r *Raft) LeaderCh() <-chan bool {
return r.leaderCh
ch := make(chan bool, 1)
r.leaderChsLock.Lock()
if len(r.leaderChs) == 0 {
select {
case v := <-r.leaderCh:
r.leaderChLastMessage = v
default:
}
go func() {
for v := range r.leaderCh {
r.leaderChsLock.Lock()
r.leaderChLastMessage = v
for _, c := range r.leaderChs {
overrideNotifyBool(c, v)
}
r.leaderChsLock.Unlock()
}
}()
}
r.leaderChs = append(r.leaderChs, ch)
ch <- r.leaderChLastMessage
r.leaderChsLock.Unlock()
return ch
}

// String returns a string representation of this Raft node.
Expand Down
17 changes: 10 additions & 7 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,15 @@ func TestRaft_SingleNode(t *testing.T) {
raft := c.rafts[0]

// Watch leaderCh for change
select {
case v := <-raft.LeaderCh():
if !v {
c.FailNowf("should become leader")
ch := raft.LeaderCh()
isLeader := false
for !isLeader {
select {
case v := <-ch:
isLeader = v
case <-time.After(conf.HeartbeatTimeout * 3):
c.FailNowf("timeout becoming leader")
}
case <-time.After(conf.HeartbeatTimeout * 3):
c.FailNowf("timeout becoming leader")
}

// Should be leader
Expand Down Expand Up @@ -1507,10 +1509,11 @@ func TestRaft_LeaderLeaseExpire(t *testing.T) {

// Watch the leaderCh
timeout := time.After(conf.LeaderLeaseTimeout * 2)
lch := leader.LeaderCh()
LOOP:
for {
select {
case v := <-leader.LeaderCh():
case v := <-lch:
if !v {
break LOOP
}
Expand Down