|
73 | 73 | // ErrLeadershipTransferInProgress is returned when the leader is rejecting
|
74 | 74 | // client requests because it is attempting to transfer leadership.
|
75 | 75 | ErrLeadershipTransferInProgress = errors.New("leadership transfer in progress")
|
| 76 | + |
| 77 | + // ErrIncompatibleLogStore is returned when the log store does not support |
| 78 | + // or implement some required methods. |
| 79 | + ErrIncompatibleLogStore = errors.New("log store does not implement some required methods or malformed") |
76 | 80 | )
|
77 | 81 |
|
78 | 82 | // Raft implements a Raft node.
|
@@ -590,7 +594,9 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
|
590 | 594 | return nil, err
|
591 | 595 | }
|
592 | 596 |
|
593 |
| - r.recoverFromCommittedLogs() |
| 597 | + if err := r.recoverFromCommittedLogs(); err != nil { |
| 598 | + return nil, err |
| 599 | + } |
594 | 600 |
|
595 | 601 | // Scan through the log for any configuration change entries.
|
596 | 602 | snapshotIndex, _ := r.getLastSnapshot()
|
@@ -706,36 +712,37 @@ func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool {
|
706 | 712 | }
|
707 | 713 |
|
708 | 714 | // recoverFromCommittedLogs recovers the Raft node from committed logs.
|
709 |
| -func (r *Raft) recoverFromCommittedLogs() { |
| 715 | +func (r *Raft) recoverFromCommittedLogs() error { |
710 | 716 | if !r.RestoreCommittedLogs {
|
711 |
| - return |
| 717 | + return nil |
712 | 718 | }
|
713 | 719 |
|
714 | 720 | // If the store implements CommitTrackingLogStore, we can read the commit index from the store.
|
715 | 721 | // This is useful when the store is able to track the commit index and we can avoid replaying logs.
|
716 | 722 | store, ok := r.logs.(CommitTrackingLogStore)
|
717 | 723 | if !ok {
|
718 |
| - r.logger.Warn("fast recovery enabled but log store does not support it", "log_store", fmt.Sprintf("%T", r.logs)) |
719 |
| - return |
| 724 | + r.logger.Warn("restore committed logs enabled but log store does not support it", "log_store", fmt.Sprintf("%T", r.logs)) |
| 725 | + return ErrIncompatibleLogStore |
720 | 726 | }
|
721 | 727 |
|
722 | 728 | commitIndex, err := store.GetCommitIndex()
|
723 | 729 | if err != nil {
|
724 | 730 | r.logger.Error("failed to get commit index from store", "error", err)
|
725 |
| - panic(err) |
| 731 | + return err |
726 | 732 | }
|
727 | 733 |
|
728 | 734 | lastIndex, err := r.logs.LastIndex()
|
729 | 735 | if err != nil {
|
730 | 736 | r.logger.Error("failed to get last log index from store", "error", err)
|
731 |
| - panic(err) |
| 737 | + return err |
732 | 738 | }
|
733 | 739 | if commitIndex > lastIndex {
|
734 | 740 | commitIndex = lastIndex
|
735 | 741 | }
|
736 | 742 |
|
737 | 743 | r.setCommitIndex(commitIndex)
|
738 | 744 | r.processLogs(commitIndex, nil)
|
| 745 | + return nil |
739 | 746 | }
|
740 | 747 |
|
741 | 748 | func (r *Raft) config() Config {
|
|
0 commit comments