Skip to content

Commit d7afc72

Browse files
committed
Change: move default impl methods in RaftStorage to StorageHelper.
- `get_initial_state()` - `get_log_id()` - `get_membership()` - `last_membership_in_log()` In the trait `RaftStorage`, these methods provide several default methods that users do not need to care about. It should no longer be methods that user may need to implement. To upgrade: If you have been using these methods, replace `sto.xxx()` with `StorageHelper::new(&mut sto).xxx()`.
1 parent e80ca38 commit d7afc72

10 files changed

+227
-196
lines changed

openraft/src/core/raft_core.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::raft_types::LogIdOptionExt;
4444
use crate::raft_types::RaftLogId;
4545
use crate::runtime::RaftRuntime;
4646
use crate::storage::RaftSnapshotBuilder;
47+
use crate::storage::StorageHelper;
4748
use crate::timer::RaftTimer;
4849
use crate::timer::Timeout;
4950
use crate::versioned::Versioned;
@@ -177,7 +178,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
177178
async fn do_main(&mut self) -> Result<(), Fatal<C::NodeId>> {
178179
tracing::debug!("raft node is initializing");
179180

180-
let state = self.storage.get_initial_state().await?;
181+
let state = {
182+
let mut helper = StorageHelper::new(&mut self.storage);
183+
helper.get_initial_state().await?
184+
};
181185

182186
// TODO(xp): this is not necessary.
183187
self.storage.save_vote(&state.vote).await?;

openraft/src/engine/log_id_list.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::raft_types::RaftLogId;
2+
use crate::storage::StorageHelper;
23
use crate::LogId;
34
use crate::LogIdOptionExt;
45
use crate::NodeId;
@@ -40,14 +41,14 @@ impl<NID: NodeId> LogIdList<NID> {
4041
/// A-------B-------C : find(A,B); find(B,C) // both find `B`, need to de-dup
4142
/// A-------C-------C : find(A,C)
4243
/// ```
43-
pub async fn load_log_ids<C, Sto>(
44+
pub(crate) async fn load_log_ids<C, Sto>(
4445
last_purged_log_id: Option<LogId<NID>>,
4546
last_log_id: Option<LogId<NID>>,
46-
sto: &mut Sto,
47+
sto: &mut StorageHelper<'_, C, Sto>,
4748
) -> Result<LogIdList<NID>, StorageError<NID>>
4849
where
4950
C: RaftTypeConfig<NodeId = NID>,
50-
Sto: RaftStorage<C> + ?Sized,
51+
Sto: RaftStorage<C>,
5152
{
5253
let mut res = vec![];
5354

openraft/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ pub use crate::storage::RaftSnapshotBuilder;
8787
pub use crate::storage::RaftStorage;
8888
pub use crate::storage::RaftStorageDebug;
8989
pub use crate::storage::SnapshotMeta;
90+
pub use crate::storage::StorageHelper;
9091
pub use crate::storage_error::DefensiveError;
9192
pub use crate::storage_error::ErrorSubject;
9293
pub use crate::storage_error::ErrorVerb;

openraft/src/storage/helper.rs

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
use std::marker::PhantomData;
2+
use std::sync::Arc;
3+
4+
use crate::engine::LogIdList;
5+
use crate::EffectiveMembership;
6+
use crate::EntryPayload;
7+
use crate::LogId;
8+
use crate::LogIdOptionExt;
9+
use crate::MembershipState;
10+
use crate::RaftState;
11+
use crate::RaftStorage;
12+
use crate::RaftTypeConfig;
13+
use crate::StorageError;
14+
15+
/// StorageHelper provides additional methods to access a RaftStorage implementation.
16+
pub struct StorageHelper<'a, C, Sto>
17+
where
18+
C: RaftTypeConfig,
19+
Sto: RaftStorage<C>,
20+
{
21+
pub(crate) sto: &'a mut Sto,
22+
_p: PhantomData<C>,
23+
}
24+
25+
impl<'a, C, Sto> StorageHelper<'a, C, Sto>
26+
where
27+
C: RaftTypeConfig,
28+
Sto: RaftStorage<C>,
29+
{
30+
pub fn new(sto: &'a mut Sto) -> Self {
31+
Self {
32+
sto,
33+
_p: Default::default(),
34+
}
35+
}
36+
37+
/// Get Raft's state information from storage.
38+
///
39+
/// When the Raft node is first started, it will call this interface to fetch the last known state from stable
40+
/// storage.
41+
pub async fn get_initial_state(&mut self) -> Result<RaftState<C::NodeId>, StorageError<C::NodeId>> {
42+
let vote = self.sto.read_vote().await?;
43+
let st = self.sto.get_log_state().await?;
44+
let mut last_purged_log_id = st.last_purged_log_id;
45+
let mut last_log_id = st.last_log_id;
46+
let (last_applied, _) = self.sto.last_applied_state().await?;
47+
let mem_state = self.get_membership().await?;
48+
49+
// Clean up dirty state: snapshot is installed but logs are not cleaned.
50+
if last_log_id < last_applied {
51+
self.sto.purge_logs_upto(last_applied.unwrap()).await?;
52+
last_log_id = last_applied;
53+
last_purged_log_id = last_applied;
54+
}
55+
56+
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self).await?;
57+
58+
Ok(RaftState {
59+
last_applied,
60+
// The initial value for `vote` is the minimal possible value.
61+
// See: [Conditions for initialization](https://datafuselabs.github.io/openraft/cluster-formation.html#conditions-for-initialization)
62+
vote: vote.unwrap_or_default(),
63+
log_ids,
64+
membership_state: mem_state,
65+
66+
// -- volatile fields: they are not persisted.
67+
leader: None,
68+
committed: None,
69+
server_state: Default::default(),
70+
})
71+
}
72+
73+
/// Get the log id of the entry at `index`.
74+
pub async fn get_log_id(&mut self, log_index: u64) -> Result<LogId<C::NodeId>, StorageError<C::NodeId>> {
75+
let st = self.sto.get_log_state().await?;
76+
77+
if Some(log_index) == st.last_purged_log_id.index() {
78+
return Ok(st.last_purged_log_id.unwrap());
79+
}
80+
81+
let entries = self.sto.get_log_entries(log_index..=log_index).await?;
82+
83+
Ok(entries[0].log_id)
84+
}
85+
86+
/// Returns the last 2 membership config found in log or state machine.
87+
///
88+
/// A raft node needs to store at most 2 membership config log:
89+
/// - The first one must be committed, because raft allows to propose new membership only when the previous one is
90+
/// committed.
91+
/// - The second may be committed or not.
92+
///
93+
/// Because when handling append-entries RPC, (1) a raft follower will delete logs that are inconsistent with the
94+
/// leader,
95+
/// and (2) a membership will take effect at once it is written,
96+
/// a follower needs to revert the effective membership to a previous one.
97+
///
98+
/// And because (3) there is at most one outstanding, uncommitted membership log,
99+
/// a follower only need to revert at most one membership log.
100+
///
101+
/// Thus a raft node will only need to store at most two recent membership logs.
102+
pub async fn get_membership(&mut self) -> Result<MembershipState<C::NodeId>, StorageError<C::NodeId>> {
103+
let (_, sm_mem) = self.sto.last_applied_state().await?;
104+
105+
let sm_mem_next_index = sm_mem.log_id.next_index();
106+
107+
let log_mem = self.last_membership_in_log(sm_mem_next_index).await?;
108+
tracing::debug!(membership_in_sm=?sm_mem, membership_in_log=?log_mem, "RaftStorage::get_membership");
109+
110+
// There 2 membership configs in logs.
111+
if log_mem.len() == 2 {
112+
return Ok(MembershipState {
113+
committed: Arc::new(log_mem[0].clone()),
114+
effective: Arc::new(log_mem[1].clone()),
115+
});
116+
}
117+
118+
let effective = if log_mem.is_empty() {
119+
sm_mem.clone()
120+
} else {
121+
log_mem[0].clone()
122+
};
123+
124+
let res = MembershipState {
125+
committed: Arc::new(sm_mem),
126+
effective: Arc::new(effective),
127+
};
128+
129+
Ok(res)
130+
}
131+
132+
/// Get the last 2 membership configs found in the log.
133+
///
134+
/// This method returns at most membership logs with greatest log index which is `>=since_index`.
135+
/// If no such membership log is found, it returns `None`, e.g., when logs are cleaned after being applied.
136+
#[tracing::instrument(level = "trace", skip(self))]
137+
pub async fn last_membership_in_log(
138+
&mut self,
139+
since_index: u64,
140+
) -> Result<Vec<EffectiveMembership<C::NodeId>>, StorageError<C::NodeId>> {
141+
let st = self.sto.get_log_state().await?;
142+
143+
let mut end = st.last_log_id.next_index();
144+
let start = std::cmp::max(st.last_purged_log_id.next_index(), since_index);
145+
let step = 64;
146+
147+
let mut res = vec![];
148+
149+
while start < end {
150+
let entries = self.sto.try_get_log_entries(start..end).await?;
151+
152+
for ent in entries.iter().rev() {
153+
if let EntryPayload::Membership(ref mem) = ent.payload {
154+
let em = EffectiveMembership::new(Some(ent.log_id), mem.clone());
155+
res.insert(0, em);
156+
if res.len() == 2 {
157+
return Ok(res);
158+
}
159+
}
160+
}
161+
162+
end = end.saturating_sub(step);
163+
}
164+
165+
Ok(res)
166+
}
167+
}

0 commit comments

Comments
 (0)