Skip to content

Add WaitCommitted #640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ type IndexFuture interface {
type ApplyFuture interface {
IndexFuture

// WaitCommitted blocks until the log entry has been committed to quorum.
// It does not wait for FSM application.
// The error returned follows the same semantics as the Error method,
// except for errors that occur after the log entry has been committed.
WaitCommitted() error
Comment on lines +39 to +43
Copy link
Author

@kolesnikovae kolesnikovae Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, there's an implicit dependency on the applyCh buffer size, which is controlled via the BatchApplyCh and MaxAppendEntries configuration parameters (defaults to 64, max 1024, if BatchApplyCh is set to true). Once the buffer is full, Apply blocks.

	// MaxAppendEntries controls the maximum number of append entries
	// to send at once. We want to strike a balance between efficiency
	// and avoiding waste if the follower is going to reject because of
	// an inconsistent log.

I'd like to control the applyCh buffer size explicitly (as 1024 is fairly conservative) via a separate option, e.g., ApplyBufferSize, which would default to MaxAppendEntries and must not be less than that. The option could supersede the BatchApplyCh option.


// Response returns the FSM response as returned by the FSM.Apply method. This
// must not be called until after the Error method has returned.
// Note that if FSM.Apply returns an error, it will be returned by Response,
Expand Down Expand Up @@ -87,6 +93,10 @@ func (e errorFuture) Index() uint64 {
return 0
}

func (e errorFuture) WaitCommitted() error {
return e.err
}

// deferError can be embedded to allow a future
// to provide an error in the future.
type deferError struct {
Expand Down Expand Up @@ -151,9 +161,15 @@ type bootstrapFuture struct {
// the log is considered committed.
type logFuture struct {
deferError
log Log
response interface{}
dispatch time.Time
log Log
response interface{}
dispatch time.Time
committed chan struct{}
}

func (l *logFuture) init() {
l.committed = make(chan struct{})
l.deferError.init()
}

func (l *logFuture) Response() interface{} {
Expand All @@ -164,6 +180,45 @@ func (l *logFuture) Index() uint64 {
return l.log.Index
}

func (l *logFuture) WaitCommitted() error {
select {
default:
case <-l.committed:
// If the entry is committed, errors are irrelevant because quorum
// agreement ensures safety. If an error occurs before commitment,
// it must be returned (e.g., leadership loss).
return nil
}
if l.err == nil {
if l.errCh == nil {
panic("waiting for response on nil channel")
}
select {
case <-l.committed:
case l.err = <-l.errCh:
// If the error is nil, it means that the command
// has been applied to the FSM already. The l.committed
// channel is also closed as the command is guaranteed
// to be committed.
//
// In this case, if Error is called after WaitCommitted,
// the method will not block, and the caller will be
// able to access the response safely. Otherwise, if the
// response has not been received yet, Error will block.
//
// The same is true for the ShutdownCh.
case <-l.ShutdownCh:
l.err = ErrRaftShutdown
}
}
select {
case <-l.committed:
return nil
default:
return l.err
}
}

type shutdownFuture struct {
raft *Raft
}
Expand Down
32 changes: 32 additions & 0 deletions future_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,35 @@ func TestDeferFutureConcurrent(t *testing.T) {
t.Errorf("unexpected error result; got %#v want %#v", got, want)
}
}

func TestLogFutureWaitCommittedError(t *testing.T) {
assert := func(t *testing.T, want error, fn func() error) {
t.Helper()
if got := fn(); got != want {
t.Fatalf("unexpected error result; got %#v want %#v", got, want)
}
}

t.Run("ErrorBeforeCommitted", func(t *testing.T) {
want := errors.New("x")
var f logFuture
f.init()
f.respond(want)
assert(t, want, f.WaitCommitted)
assert(t, want, f.WaitCommitted)
assert(t, want, f.Error)
assert(t, want, f.Error)
})

t.Run("ErrorAfterCommitted", func(t *testing.T) {
want := errors.New("x")
var f logFuture
f.init()
close(f.committed)
f.respond(want)
assert(t, nil, f.WaitCommitted)
assert(t, want, f.Error)
assert(t, nil, f.WaitCommitted)
assert(t, want, f.Error)
})
}
2 changes: 2 additions & 0 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ func (r *Raft) runLeader() {
// maintain that there exists at most one uncommitted configuration entry in
// any log, so we have to do proper no-ops here.
noop := &logFuture{log: Log{Type: LogNoop}}
noop.init()
Copy link
Author

@kolesnikovae kolesnikovae Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the no-op command issued by the leader at the beginning of a term is not properly initialized (which is probably fine), but it is necessary for the change introduced in this PR.

r.dispatchLogs([]*logFuture{noop})

// Sit in the leader loop until we step down
Expand Down Expand Up @@ -818,6 +819,7 @@ func (r *Raft) leaderLoop() {
groupReady = append(groupReady, e)
groupFutures[idx] = commitLog
lastIdxInGroup = idx
close(commitLog.committed)
Comment on lines 821 to +822
Copy link
Author

@kolesnikovae kolesnikovae Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth between a "safer" option with a nil check (similar to the future error channel) and a more direct approach with an unconditional close. I opted for the latter because I couldn't identify a case where the same logFuture would be committed twice, which, in that case, would likely lead to a bigger issue.

}

// Process the group
Expand Down
56 changes: 56 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,62 @@ func TestRaft_ApplyConcurrent_Timeout(t *testing.T) {
t.Fatalf("Timeout waiting to detect apply timeouts")
}

func TestRaft_WaitCommittedConcurrent(t *testing.T) {
// Make the cluster
conf := inmemConfig(t)
conf.HeartbeatTimeout = 2 * conf.HeartbeatTimeout
conf.ElectionTimeout = 2 * conf.ElectionTimeout
c := MakeCluster(3, t, conf)
defer c.Close()

// Wait for a leader
leader := c.Leader()

// Create a wait group
const sz = 100
var group sync.WaitGroup
group.Add(sz)

applyF := func(i int) {
defer group.Done()
future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0)
if err := future.WaitCommitted(); err != nil {
c.Failf("[ERR] err: %v", err)
}
}

// Concurrently apply
for i := 0; i < sz; i++ {
go applyF(i)
}

// Wait to finish
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
group.Wait()
// Unlike Apply, WaitCommitted does not wait for the FSM to apply
// the commands. Therefore, we explicitly wait until the committed
// commands are replicated and applied to all the replicas' FSM.
// WaitForReplication times out after longstopTimeout.
c.WaitForReplication(sz)
}()
select {
case <-doneCh:
case <-time.After(c.longstopTimeout):
t.Fatalf("timeout")
}

// If anything failed up to this point then bail now, rather than do a
// confusing compare.
if t.Failed() {
t.Fatalf("One or more of the apply operations failed")
}

// Check the FSMs
c.EnsureSame(t)
}

func TestRaft_JoinNode(t *testing.T) {
// Make a cluster
c := MakeCluster(2, t, nil)
Expand Down