diff --git a/api.go b/api.go index 2b38798b..54690257 100644 --- a/api.go +++ b/api.go @@ -73,6 +73,10 @@ var ( // ErrLeadershipTransferInProgress is returned when the leader is rejecting // client requests because it is attempting to transfer leadership. ErrLeadershipTransferInProgress = errors.New("leadership transfer in progress") + + // ErrIncompatibleLogStore is returned when the log store does not support + // or implement some required methods. + ErrIncompatibleLogStore = errors.New("log store does not implement some required methods or malformed") ) // Raft implements a Raft node. @@ -222,6 +226,10 @@ type Raft struct { // legacy metrics are those that have `_peer_name` as metric suffix instead as labels. // e.g: raft_replication_heartbeat_peer0 noLegacyTelemetry bool + + // RestoreCommittedLogs is used to enable restore committed logs mode + // restore committed logs mode is disabled if set to false. + RestoreCommittedLogs bool } // BootstrapCluster initializes a server's storage with the given cluster @@ -575,6 +583,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second), preVoteDisabled: conf.PreVoteDisabled || !transportSupportPreVote, noLegacyTelemetry: conf.NoLegacyTelemetry, + RestoreCommittedLogs: conf.RestoreCommittedLogs, } if !transportSupportPreVote && !conf.PreVoteDisabled { r.logger.Warn("pre-vote is disabled because it is not supported by the Transport") @@ -594,9 +603,14 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna return nil, err } + if err := r.restoreFromCommittedLogs(); err != nil { + return nil, err + } + // Scan through the log for any configuration change entries. snapshotIndex, _ := r.getLastSnapshot() - for index := snapshotIndex + 1; index <= lastLog.Index; index++ { + lastappliedIndex := r.getLastApplied() + for index := max(snapshotIndex, lastappliedIndex) + 1; index <= lastLog.Index; index++ { var entry Log if err := r.logs.GetLog(index, &entry); err != nil { r.logger.Error("failed to get log", "index", index, "error", err) @@ -706,6 +720,40 @@ func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool { return true } +// restoreFromCommittedLogs recovers the Raft node from committed logs. +func (r *Raft) restoreFromCommittedLogs() error { + if !r.RestoreCommittedLogs { + return nil + } + + // If the store implements CommitTrackingLogStore, we can read the commit index from the store. + // This is useful when the store is able to track the commit index and we can avoid replaying logs. + store, ok := r.logs.(CommitTrackingLogStore) + if !ok { + r.logger.Warn("restore committed logs enabled but log store does not support it", "log_store", fmt.Sprintf("%T", r.logs)) + return ErrIncompatibleLogStore + } + + commitIndex, err := store.GetCommitIndex() + if err != nil { + r.logger.Error("failed to get commit index from store", "error", err) + return err + } + + lastIndex, err := r.logs.LastIndex() + if err != nil { + r.logger.Error("failed to get last log index from store", "error", err) + return err + } + if commitIndex > lastIndex { + commitIndex = lastIndex + } + + r.setCommitIndex(commitIndex) + r.processLogs(commitIndex, nil) + return nil +} + func (r *Raft) config() Config { return r.conf.Load().(Config) } diff --git a/config.go b/config.go index 0f586973..4b591326 100644 --- a/config.go +++ b/config.go @@ -240,6 +240,19 @@ type Config struct { // e.g: raft_replication_heartbeat_peer0 NoLegacyTelemetry bool + // RestoreCommittedLogs controls if the Raft server should use the restore committed logs + // mechanism. Restore committed logs requires a LogStore implementation that + // support commit tracking. When such a store is used and this config + // enabled, raft nodes will replay all known-committed logs on disk + // before completing `NewRaft` on startup. This is mainly useful where + // the application allows relaxed-consistency reads from followers as it + // will reduce how far behind the follower's FSM is when it starts. If all reads + // are forwarded to the leader then there won't be observable benefit from this feature. + // + // Notice: If this is enabled, the log store MUST implement the CommitTrackingLogStore + // interface. Otherwise, Raft will fail to start and return ErrIncompatibleLogStore. + RestoreCommittedLogs bool + // skipStartup allows NewRaft() to bypass all background work goroutines skipStartup bool } diff --git a/inmem_store.go b/inmem_store.go index 730d03f2..14cc5c5e 100644 --- a/inmem_store.go +++ b/inmem_store.go @@ -6,8 +6,11 @@ package raft import ( "errors" "sync" + "sync/atomic" ) +var _ CommitTrackingLogStore = &InmemCommitTrackingStore{} + // InmemStore implements the LogStore and StableStore interface. // It should NOT EVER be used for production. It is used only for // unit tests. Use the MDBStore implementation instead. @@ -131,3 +134,30 @@ func (i *InmemStore) GetUint64(key []byte) (uint64, error) { defer i.l.RUnlock() return i.kvInt[string(key)], nil } + +type commitIndexTrackingLog struct { + log *Log + CommitIndex uint64 +} +type InmemCommitTrackingStore struct { + InmemStore + commitIndex atomic.Uint64 +} + +// NewInmemCommitTrackingStore returns a new in-memory backend that tracks the commit index. Do not ever +// use for production. Only for testing. +func NewInmemCommitTrackingStore() *InmemCommitTrackingStore { + i := &InmemCommitTrackingStore{ + InmemStore: *NewInmemStore(), + } + return i +} + +func (i *InmemCommitTrackingStore) StageCommitIndex(index uint64) error { + i.commitIndex.Store(index) + return nil +} + +func (i *InmemCommitTrackingStore) GetCommitIndex() (uint64, error) { + return i.commitIndex.Load(), nil +} diff --git a/log.go b/log.go index 4a6dc47b..e93ef318 100644 --- a/log.go +++ b/log.go @@ -190,3 +190,32 @@ func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, st } } } + +type CommitTrackingLogStore interface { + LogStore + + // StageCommitIndex stages a new commit index to be persisted. + // The staged commit index MUST only be persisted in a manner that is atomic + // with the following StoreLogs call in the face of a crash. + // This allows the Raft implementation to optimize commit index updates + // without risking inconsistency between the commit index and the log entries. + // + // The implementation MUST NOT persist this value separately from the log entries. + // Instead, it should stage the value to be written atomically with the next + // StoreLogs call. + // + // GetCommitIndex MUST never return a value higher than the last index in the log, + // even if a higher value has been staged with this method. + // + // idx is the new commit index to stage. + StageCommitIndex(idx uint64) error + + // GetCommitIndex returns the latest persisted commit index from the latest log entry + // in the store at startup. + // + // GetCommitIndex should not return a value higher than the last index in the log. + // If that happens, the last index in the log will be used. + // + // When no commit index is found in the log store, return (0, nil). + GetCommitIndex() (uint64, error) +} diff --git a/raft.go b/raft.go index df5cea3f..7213dc24 100644 --- a/raft.go +++ b/raft.go @@ -1262,6 +1262,9 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { r.leaderState.inflight.PushBack(applyLog) } + commitIndex := r.getCommitIndex() + r.tryStageCommitIndex(commitIndex) + // Write the log entry locally if err := r.logs.StoreLogs(logs); err != nil { r.logger.Error("failed to commit logs", "error", err) @@ -1385,6 +1388,20 @@ func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple { return nil } +// tryStageCommitIndex updates the commit index in persist store if restore committed logs is enabled and log store implements CommitTrackingLogStore. +func (r *Raft) tryStageCommitIndex(commitIndex uint64) { + if !r.RestoreCommittedLogs { + return + } + store, ok := r.logs.(CommitTrackingLogStore) + if !ok { + return + } + if err := store.StageCommitIndex(commitIndex); err != nil { + r.logger.Error("failed to stage commit index in commit tracking log store", "index", commitIndex, "error", err) + } +} + // processRPC is called to handle an incoming RPC request. This must only be // called from the main thread. func (r *Raft) processRPC(rpc RPC) { @@ -1535,6 +1552,11 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } if n := len(newEntries); n > 0 { + // Stage the future commit index if possible + lastNewIndex := newEntries[len(newEntries)-1].Index + commitIndex := min(a.LeaderCommitIndex, lastNewIndex) + r.tryStageCommitIndex(commitIndex) + // Append the new entries if err := r.logs.StoreLogs(newEntries); err != nil { r.logger.Error("failed to append to logs", "error", err) diff --git a/raft_test.go b/raft_test.go index bd94812c..1fce8e56 100644 --- a/raft_test.go +++ b/raft_test.go @@ -621,27 +621,30 @@ func TestRaft_JoinNode(t *testing.T) { func TestRaft_JoinNode_ConfigStore(t *testing.T) { // Make a cluster conf := inmemConfig(t) - c := makeCluster(t, &MakeClusterOpts{ + c, err := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: true, Conf: conf, ConfigStoreFSM: true, }) + require.NoError(t, err) defer c.Close() // Make a new nodes - c1 := makeCluster(t, &MakeClusterOpts{ + c1, err := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: false, Conf: conf, ConfigStoreFSM: true, }) - c2 := makeCluster(t, &MakeClusterOpts{ + require.NoError(t, err) + c2, err := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: false, Conf: conf, ConfigStoreFSM: true, }) + require.NoError(t, err) // Merge clusters c.Merge(c1) @@ -1095,6 +1098,199 @@ func TestRaft_RestoreSnapshotOnStartup_Monotonic(t *testing.T) { assert.Equal(t, lastIdx, last) } +func TestRaft_RestoreSnapshotOnStartup_CommitTrackingLogs(t *testing.T) { + // Make the cluster + conf := inmemConfig(t) + conf.TrailingLogs = 10 + opts := &MakeClusterOpts{ + Peers: 1, + Bootstrap: true, + Conf: conf, + CommitTrackingLogs: true, + } + c := MakeClusterCustom(t, opts) + defer c.Close() + + leader := c.Leader() + + // Commit a lot of things + var future Future + for i := 0; i < 100; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + } + + // Wait for the last future to apply + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Take a snapshot + snapFuture := leader.Snapshot() + if err := snapFuture.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Check for snapshot + snaps, _ := leader.snapshots.List() + if len(snaps) != 1 { + t.Fatalf("should have a snapshot") + } + snap := snaps[0] + + // Logs should be trimmed + firstIdx, err := leader.logs.FirstIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + lastIdx, err := leader.logs.LastIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + + if firstIdx != snap.Index-conf.TrailingLogs+1 { + t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, firstIdx) + } + + // Shutdown + shutdown := leader.Shutdown() + if err := shutdown.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Restart the Raft + r := leader + // Can't just reuse the old transport as it will be closed + _, trans2 := NewInmemTransport(r.trans.LocalAddr()) + cfg := r.config() + r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) + if err != nil { + t.Fatalf("err: %v", err) + } + c.rafts[0] = r + + // We should have restored from the snapshot! + if last := r.getLastApplied(); last != snap.Index { + t.Fatalf("bad last index: %d, expecting %d", last, snap.Index) + } + + // Verify that logs have not been reset + first, _ := r.logs.FirstIndex() + last, _ := r.logs.LastIndex() + assert.Equal(t, firstIdx, first) + assert.Equal(t, lastIdx, last) +} + +func TestRaft_RestoreCommittedLogs(t *testing.T) { + // Make the cluster + conf := inmemConfig(t) + conf.TrailingLogs = 10 + conf.RestoreCommittedLogs = true + opts := &MakeClusterOpts{ + Peers: 1, + Bootstrap: true, + Conf: conf, + CommitTrackingLogs: true, + } + c := MakeClusterCustom(t, opts) + defer c.Close() + + leader := c.Leader() + + // Commit a lot of things + var future Future + for i := 0; i < 100; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + } + + // Wait for the last future to apply + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Take a snapshot + snapFuture := leader.Snapshot() + if err := snapFuture.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Check for snapshot + snaps, _ := leader.snapshots.List() + if len(snaps) != 1 { + t.Fatalf("should have a snapshot") + } + snap := snaps[0] + + // Logs should be trimmed + firstIdx, err := leader.logs.FirstIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + + if firstIdx != snap.Index-conf.TrailingLogs+1 { + t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, firstIdx) + } + + // Commit a lot of things (for restore committed logs test) + for i := 0; i < 100; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + } + + // Wait for the last future to apply + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Shutdown + shutdown := leader.Shutdown() + if err := shutdown.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Restart the Raft + r := leader + // Can't just reuse the old transport as it will be closed + _, trans2 := NewInmemTransport(r.trans.LocalAddr()) + cfg := r.config() + r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) + if err != nil { + t.Fatalf("err: %v", err) + } + c.rafts[0] = r + + store, ok := r.logs.(CommitTrackingLogStore) + if !ok { + t.Fatal("err: raft log store does not implement CommitTrackingLogStore interface") + } + commitIdx, err := store.GetCommitIndex() + // We should have applied all committed logs + if last := r.getLastApplied(); last != commitIdx { + t.Fatalf("bad last index: %d, expecting %d", last, commitIdx) + } + + // Expect: snap.Index --- commitIdx --- lastIdx + lastIdx, err := r.logs.LastIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + assert.LessOrEqual(t, snap.Index, commitIdx) + assert.LessOrEqual(t, commitIdx, lastIdx) +} + +func TestRaft_RestoreCommittedLogs_IncompatibleLogStore(t *testing.T) { + // Make the cluster + conf := inmemConfig(t) + conf.TrailingLogs = 10 + conf.RestoreCommittedLogs = true + opts := &MakeClusterOpts{ + Peers: 1, + Bootstrap: true, + Conf: conf, + CommitTrackingLogs: false, + } + _, err := MakeClusterCustomWithErr(t, opts) + require.ErrorIs(t, err, ErrIncompatibleLogStore) +} + func TestRaft_SnapshotRestore_Progress(t *testing.T) { // Make the cluster conf := inmemConfig(t) diff --git a/testing.go b/testing.go index 351a9aba..0683155f 100644 --- a/testing.go +++ b/testing.go @@ -717,20 +717,22 @@ WAIT: // NOTE: This is exposed for middleware testing purposes and is not a stable API type MakeClusterOpts struct { - Peers int - Bootstrap bool - Conf *Config - ConfigStoreFSM bool - MakeFSMFunc func() FSM - LongstopTimeout time.Duration - MonotonicLogs bool + Peers int + Bootstrap bool + Conf *Config + ConfigStoreFSM bool + MakeFSMFunc func() FSM + LongstopTimeout time.Duration + MonotonicLogs bool + CommitTrackingLogs bool + PropagateError bool // If true, return errors instead of calling t.Fatal } // makeCluster will return a cluster with the given config and number of peers. // If bootstrap is true, the servers will know about each other before starting, // otherwise their transports will be wired up but they won't yet have configured // each other. -func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { +func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { if opts.Conf == nil { opts.Conf = inmemConfig(t) } @@ -756,6 +758,9 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { for i := 0; i < opts.Peers; i++ { dir, err := os.MkdirTemp("", "raft") if err != nil { + if opts.PropagateError { + return nil, fmt.Errorf("failed to create temp dir: %w", err) + } t.Fatalf("err: %v", err) } @@ -807,6 +812,8 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { if opts.MonotonicLogs { logs = &MockMonotonicLogStore{s: logs} + } else if opts.CommitTrackingLogs { + logs = NewInmemCommitTrackingStore() } peerConf := opts.Conf @@ -816,44 +823,71 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { if opts.Bootstrap { err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration) if err != nil { + if opts.PropagateError { + return nil, fmt.Errorf("BootstrapCluster failed: %w", err) + } t.Fatalf("BootstrapCluster failed: %v", err) } } raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans) if err != nil { + if opts.PropagateError { + return nil, fmt.Errorf("NewRaft failed: %w", err) + } t.Fatalf("NewRaft failed: %v", err) } raft.RegisterObserver(NewObserver(c.observationCh, false, nil)) if err != nil { + if opts.PropagateError { + return nil, fmt.Errorf("RegisterObserver failed: %w", err) + } t.Fatalf("RegisterObserver failed: %v", err) } c.rafts = append(c.rafts, raft) } - return c + return c, nil } // NOTE: This is exposed for middleware testing purposes and is not a stable API func MakeCluster(n int, t *testing.T, conf *Config) *cluster { - return makeCluster(t, &MakeClusterOpts{ + c, err := makeCluster(t, &MakeClusterOpts{ Peers: n, Bootstrap: true, Conf: conf, }) + if err != nil { + t.Fatalf("failed to make cluster: %v", err) + } + return c } // NOTE: This is exposed for middleware testing purposes and is not a stable API func MakeClusterNoBootstrap(n int, t *testing.T, conf *Config) *cluster { - return makeCluster(t, &MakeClusterOpts{ + c, err := makeCluster(t, &MakeClusterOpts{ Peers: n, Conf: conf, }) + if err != nil { + t.Fatalf("failed to make cluster: %v", err) + } + return c } // NOTE: This is exposed for middleware testing purposes and is not a stable API func MakeClusterCustom(t *testing.T, opts *MakeClusterOpts) *cluster { + c, err := makeCluster(t, opts) + if err != nil { + t.Fatalf("failed to make cluster: %v", err) + } + return c +} + +// NOTE: This is exposed for middleware testing purposes and is not a stable API +func MakeClusterCustomWithErr(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { + opts.PropagateError = true return makeCluster(t, opts) }