Skip to content

Commit 0a1dd3d

Browse files
committed
Change: replace EffectiveMembership with StoredMembership in RaftStorage
`EffectiveMembership` is a struct used at runtime, which contains additional information such as an optimized `QuorumSet` implementation that has different structure from a `Membership`. To better separate concerns, a new struct called `StoredMembership` has been introduced specifically for storage purpose. It contains only the information that needs to be stored in storage. Therefore, `StoredMembership` is used instead of `EffectiveMembership` in RaftStorage. Upgrade tip: Replace `EffectiveMembership` with `StoredMembership` in an application. Fields in `EffectiveMembership` are made private and can be accessed via corresponding methods such as: `EffectiveMembership.log_id` and `EffectiveMembership.membership` should be replaced with `EffectiveMembership::log_id()` and `EffectiveMembership::membership()`.
1 parent 1cc2198 commit 0a1dd3d

38 files changed

+351
-261
lines changed

examples/raft-kv-memstore/src/store/mod.rs

+5-10
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use openraft::storage::LogState;
1010
use openraft::storage::Snapshot;
1111
use openraft::AnyError;
1212
use openraft::BasicNode;
13-
use openraft::EffectiveMembership;
1413
use openraft::Entry;
1514
use openraft::EntryPayload;
1615
use openraft::ErrorSubject;
@@ -22,6 +21,7 @@ use openraft::RaftStorage;
2221
use openraft::SnapshotMeta;
2322
use openraft::StorageError;
2423
use openraft::StorageIOError;
24+
use openraft::StoredMembership;
2525
use openraft::Vote;
2626
use serde::Deserialize;
2727
use serde::Serialize;
@@ -73,7 +73,7 @@ pub struct ExampleStateMachine {
7373
pub last_applied_log: Option<LogId<ExampleNodeId>>,
7474

7575
// TODO: it should not be Option.
76-
pub last_membership: EffectiveMembership<ExampleNodeId, BasicNode>,
76+
pub last_membership: StoredMembership<ExampleNodeId, BasicNode>,
7777

7878
/// Application data.
7979
pub data: BTreeMap<String, String>,
@@ -250,13 +250,8 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
250250

251251
async fn last_applied_state(
252252
&mut self,
253-
) -> Result<
254-
(
255-
Option<LogId<ExampleNodeId>>,
256-
EffectiveMembership<ExampleNodeId, BasicNode>,
257-
),
258-
StorageError<ExampleNodeId>,
259-
> {
253+
) -> Result<(Option<LogId<ExampleNodeId>>, StoredMembership<ExampleNodeId, BasicNode>), StorageError<ExampleNodeId>>
254+
{
260255
let state_machine = self.state_machine.read().await;
261256
Ok((state_machine.last_applied_log, state_machine.last_membership.clone()))
262257
}
@@ -286,7 +281,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
286281
}
287282
},
288283
EntryPayload::Membership(ref mem) => {
289-
sm.last_membership = EffectiveMembership::new(Some(entry.log_id), mem.clone());
284+
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
290285
res.push(ExampleResponse { value: None })
291286
}
292287
};

examples/raft-kv-memstore/tests/cluster/test_cluster.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ async fn test_cluster() -> anyhow::Result<()> {
126126
println!("=== metrics after add-learner");
127127
let x = client.metrics().await?;
128128

129-
assert_eq!(&vec![vec![1]], x.membership_config.get_joint_config());
129+
assert_eq!(&vec![btreeset![1]], x.membership_config.membership().get_joint_config());
130130

131131
let nodes_in_cluster =
132132
x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::<BTreeMap<_, _>>();
@@ -161,7 +161,10 @@ async fn test_cluster() -> anyhow::Result<()> {
161161

162162
println!("=== metrics after change-member");
163163
let x = client.metrics().await?;
164-
assert_eq!(&vec![vec![1, 2, 3]], x.membership_config.get_joint_config());
164+
assert_eq!(
165+
&vec![btreeset![1, 2, 3]],
166+
x.membership_config.membership().get_joint_config()
167+
);
165168

166169
// --- Try to write some application data through the leader.
167170

@@ -246,7 +249,7 @@ async fn test_cluster() -> anyhow::Result<()> {
246249

247250
println!("=== metrics after change-membership to {{3}}");
248251
let x = client.metrics().await?;
249-
assert_eq!(&vec![vec![3]], x.membership_config.get_joint_config());
252+
assert_eq!(&vec![btreeset![3]], x.membership_config.membership().get_joint_config());
250253

251254
println!("=== write `foo=zoo` to node-3");
252255
let _x = client3

examples/raft-kv-rocksdb/src/store.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use openraft::async_trait::async_trait;
1414
use openraft::storage::LogState;
1515
use openraft::storage::Snapshot;
1616
use openraft::AnyError;
17-
use openraft::EffectiveMembership;
1817
use openraft::Entry;
1918
use openraft::EntryPayload;
2019
use openraft::ErrorSubject;
@@ -26,6 +25,7 @@ use openraft::RaftStorage;
2625
use openraft::SnapshotMeta;
2726
use openraft::StorageError;
2827
use openraft::StorageIOError;
28+
use openraft::StoredMembership;
2929
use openraft::Vote;
3030
use rocksdb::ColumnFamily;
3131
use rocksdb::ColumnFamilyDescriptor;
@@ -82,7 +82,7 @@ pub struct SerializableExampleStateMachine {
8282
pub last_applied_log: Option<LogId<ExampleNodeId>>,
8383

8484
// TODO: it should not be Option.
85-
pub last_membership: EffectiveMembership<ExampleNodeId, ExampleNode>,
85+
pub last_membership: StoredMembership<ExampleNodeId, ExampleNode>,
8686

8787
/// Application data.
8888
pub data: BTreeMap<String, String>,
@@ -127,7 +127,7 @@ fn sm_w_err<E: Error + 'static>(e: E) -> StorageError<ExampleNodeId> {
127127
}
128128

129129
impl ExampleStateMachine {
130-
fn get_last_membership(&self) -> StorageResult<EffectiveMembership<ExampleNodeId, ExampleNode>> {
130+
fn get_last_membership(&self) -> StorageResult<StoredMembership<ExampleNodeId, ExampleNode>> {
131131
self.db
132132
.get_cf(
133133
self.db.cf_handle("state_machine").expect("cf_handle"),
@@ -137,10 +137,10 @@ impl ExampleStateMachine {
137137
.and_then(|value| {
138138
value
139139
.map(|v| serde_json::from_slice(&v).map_err(sm_r_err))
140-
.unwrap_or_else(|| Ok(EffectiveMembership::default()))
140+
.unwrap_or_else(|| Ok(StoredMembership::default()))
141141
})
142142
}
143-
fn set_last_membership(&self, membership: EffectiveMembership<ExampleNodeId, ExampleNode>) -> StorageResult<()> {
143+
fn set_last_membership(&self, membership: StoredMembership<ExampleNodeId, ExampleNode>) -> StorageResult<()> {
144144
self.db
145145
.put_cf(
146146
self.db.cf_handle("state_machine").expect("cf_handle"),
@@ -484,7 +484,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
484484
) -> Result<
485485
(
486486
Option<LogId<ExampleNodeId>>,
487-
EffectiveMembership<ExampleNodeId, ExampleNode>,
487+
StoredMembership<ExampleNodeId, ExampleNode>,
488488
),
489489
StorageError<ExampleNodeId>,
490490
> {
@@ -520,7 +520,8 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
520520
}
521521
},
522522
EntryPayload::Membership(ref mem) => {
523-
sm.set_last_membership(EffectiveMembership::new(Some(entry.log_id), mem.clone()))?;
523+
sm.set_last_membership(StoredMembership::new(Some(entry.log_id), mem.clone()))?;
524+
524525
res.push(ExampleResponse { value: None })
525526
}
526527
};

examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
120120
println!("=== metrics after add-learner");
121121
let x = leader.metrics().await?;
122122

123-
assert_eq!(&vec![vec![1]], x.membership_config.get_joint_config());
123+
assert_eq!(&vec![btreeset![1]], x.membership_config.membership().get_joint_config());
124124

125125
let nodes_in_cluster =
126126
x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::<BTreeMap<_, _>>();
@@ -155,7 +155,10 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
155155

156156
println!("=== metrics after change-member");
157157
let x = leader.metrics().await?;
158-
assert_eq!(&vec![vec![1, 2, 3]], x.membership_config.get_joint_config());
158+
assert_eq!(
159+
&vec![btreeset![1, 2, 3]],
160+
x.membership_config.membership().get_joint_config()
161+
);
159162

160163
// --- Try to write some application data through the leader.
161164

memstore/src/lib.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use openraft::storage::RaftLogReader;
1414
use openraft::storage::RaftSnapshotBuilder;
1515
use openraft::storage::Snapshot;
1616
use openraft::AnyError;
17-
use openraft::EffectiveMembership;
1817
use openraft::Entry;
1918
use openraft::EntryPayload;
2019
use openraft::ErrorSubject;
@@ -25,6 +24,7 @@ use openraft::RaftStorageDebug;
2524
use openraft::SnapshotMeta;
2625
use openraft::StorageError;
2726
use openraft::StorageIOError;
27+
use openraft::StoredMembership;
2828
use openraft::Vote;
2929
use serde::Deserialize;
3030
use serde::Serialize;
@@ -88,7 +88,7 @@ pub struct MemStoreSnapshot {
8888
pub struct MemStoreStateMachine {
8989
pub last_applied_log: Option<LogId<MemNodeId>>,
9090

91-
pub last_membership: EffectiveMembership<MemNodeId, ()>,
91+
pub last_membership: StoredMembership<MemNodeId, ()>,
9292

9393
/// A mapping of client IDs to their state info.
9494
pub client_serial_responses: HashMap<String, (u64, Option<String>)>,
@@ -259,7 +259,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
259259

260260
async fn last_applied_state(
261261
&mut self,
262-
) -> Result<(Option<LogId<MemNodeId>>, EffectiveMembership<MemNodeId, ()>), StorageError<MemNodeId>> {
262+
) -> Result<(Option<LogId<MemNodeId>>, StoredMembership<MemNodeId, ()>), StorageError<MemNodeId>> {
263263
let sm = self.sm.read().await;
264264
Ok((sm.last_applied_log, sm.last_membership.clone()))
265265
}
@@ -339,7 +339,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
339339
res.push(ClientResponse(previous));
340340
}
341341
EntryPayload::Membership(ref mem) => {
342-
sm.last_membership = EffectiveMembership::new(Some(entry.log_id), mem.clone());
342+
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
343343
res.push(ClientResponse(None))
344344
}
345345
};

openraft/src/core/raft_core.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
345345
// request failures.
346346

347347
let _ = tx.send(Err(QuorumNotEnough {
348-
cluster: self.engine.state.membership_state.effective().membership.summary(),
348+
cluster: self.engine.state.membership_state.effective().membership().summary(),
349349
got: granted,
350350
}
351351
.into()));
@@ -472,7 +472,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
472472
// --- cluster ---
473473
state: self.engine.state.server_state,
474474
current_leader: self.current_leader(),
475-
membership_config: self.engine.state.membership_state.effective().clone(),
475+
membership_config: self.engine.state.membership_state.effective().stored_membership().clone(),
476476

477477
// --- replication ---
478478
replication,
@@ -773,10 +773,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
773773
// Safe unwrap(): target must be in membership
774774
let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap();
775775

776-
let membership_log_id = self.engine.state.membership_state.effective().log_id;
776+
let membership_log_id = self.engine.state.membership_state.effective().log_id();
777777
let network = self.network.new_client(target, target_node).await;
778778

779-
let session_id = ReplicationSessionId::new(*self.engine.state.get_vote(), membership_log_id);
779+
let session_id = ReplicationSessionId::new(*self.engine.state.get_vote(), *membership_log_id);
780780

781781
ReplicationCore::<C, N, S>::spawn(
782782
target,
@@ -1234,11 +1234,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
12341234
return false;
12351235
}
12361236

1237-
if session_id.membership_log_id != self.engine.state.membership_state.effective().log_id {
1237+
if &session_id.membership_log_id != self.engine.state.membership_state.effective().log_id() {
12381238
tracing::warn!(
12391239
"membership_log_id changed: msg sent by: {}; curr: {}; ignore when ({})",
12401240
session_id.membership_log_id.summary(),
1241-
self.engine.state.membership_state.effective().log_id.summary(),
1241+
self.engine.state.membership_state.effective().log_id().summary(),
12421242
msg
12431243
);
12441244
return false;

openraft/src/engine/engine_impl.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ where
433433
);
434434

435435
#[allow(clippy::collapsible_if)]
436-
if em.log_id.as_ref() <= self.state.committed() {
436+
if em.log_id().as_ref() <= self.state.committed() {
437437
if !em.is_voter(&self.config.id) && self.state.is_leading(&self.config.id) {
438438
tracing::debug!("leader {} is stepping down", self.config.id);
439439
self.vote_handler().become_following();

0 commit comments

Comments
 (0)