Skip to content

Commit c836355

Browse files
committed
Change: Membership.nodes remove Option from value
Before this commit, the value of `Membership.nodes` is `Option<N: Node>`: `Membership.nodes: BTreeMap<NID, Option<N>>` The value does not have to be an `Option`. If an application does not need openraft to store the `Node` data, it can just implement `trait Node` with an empty struct, or just use `BasicNode` as a placeholder. - Using `Option<N>` as the value is a legacy and since #480 is merged, we do not need the `Option` any more.
1 parent e9382ff commit c836355

21 files changed

+109
-252
lines changed

examples/raft-kv-memstore/src/network/management.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub async fn add_learner(
2828
) -> actix_web::Result<impl Responder> {
2929
let node_id = req.0 .0;
3030
let node = BasicNode { addr: req.0 .1.clone() };
31-
let res = app.raft.add_learner(node_id, Some(node), true).await;
31+
let res = app.raft.add_learner(node_id, node, true).await;
3232
Ok(Json(res))
3333
}
3434

examples/raft-kv-memstore/src/network/raft_network_impl.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ impl ExampleNetwork {
2626
pub async fn send_rpc<Req, Resp, Err>(
2727
&self,
2828
target: ExampleNodeId,
29-
target_node: Option<&BasicNode>,
29+
target_node: &BasicNode,
3030
uri: &str,
3131
req: Req,
3232
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
@@ -35,7 +35,7 @@ impl ExampleNetwork {
3535
Err: std::error::Error + DeserializeOwned,
3636
Resp: DeserializeOwned,
3737
{
38-
let addr = target_node.map(|x| &x.addr).unwrap();
38+
let addr = &target_node.addr;
3939

4040
let url = format!("http://{}/{}", addr, uri);
4141
let client = reqwest::Client::new();
@@ -57,20 +57,20 @@ impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
5757
async fn connect(
5858
&mut self,
5959
target: ExampleNodeId,
60-
node: Option<&BasicNode>,
60+
node: &BasicNode,
6161
) -> Result<Self::Network, Self::ConnectionError> {
6262
Ok(ExampleNetworkConnection {
6363
owner: ExampleNetwork {},
6464
target,
65-
target_node: node.cloned(),
65+
target_node: node.clone(),
6666
})
6767
}
6868
}
6969

7070
pub struct ExampleNetworkConnection {
7171
owner: ExampleNetwork,
7272
target: ExampleNodeId,
73-
target_node: Option<BasicNode>,
73+
target_node: BasicNode,
7474
}
7575

7676
#[async_trait]
@@ -82,7 +82,7 @@ impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
8282
AppendEntriesResponse<ExampleNodeId>,
8383
RPCError<ExampleNodeId, BasicNode, AppendEntriesError<ExampleNodeId>>,
8484
> {
85-
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-append", req).await
85+
self.owner.send_rpc(self.target, &self.target_node, "raft-append", req).await
8686
}
8787

8888
async fn send_install_snapshot(
@@ -92,13 +92,13 @@ impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
9292
InstallSnapshotResponse<ExampleNodeId>,
9393
RPCError<ExampleNodeId, BasicNode, InstallSnapshotError<ExampleNodeId>>,
9494
> {
95-
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-snapshot", req).await
95+
self.owner.send_rpc(self.target, &self.target_node, "raft-snapshot", req).await
9696
}
9797

9898
async fn send_vote(
9999
&mut self,
100100
req: VoteRequest<ExampleNodeId>,
101101
) -> Result<VoteResponse<ExampleNodeId>, RPCError<ExampleNodeId, BasicNode, VoteError<ExampleNodeId>>> {
102-
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-vote", req).await
102+
self.owner.send_rpc(self.target, &self.target_node, "raft-vote", req).await
103103
}
104104
}

examples/raft-kv-memstore/tests/cluster/test_cluster.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ async fn test_cluster() -> anyhow::Result<()> {
9090
x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::<BTreeMap<_, _>>();
9191
assert_eq!(
9292
btreemap! {
93-
1 => Some(BasicNode::new("127.0.0.1:21001")),
94-
2 => Some(BasicNode::new("127.0.0.1:21002")),
95-
3 => Some(BasicNode::new("127.0.0.1:21003")),
93+
1 => BasicNode::new("127.0.0.1:21001"),
94+
2 => BasicNode::new("127.0.0.1:21002"),
95+
3 => BasicNode::new("127.0.0.1:21003"),
9696
},
9797
nodes_in_cluster
9898
);

examples/raft-kv-rocksdb/src/network/management.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub fn rest(app: &mut Server) {
3232
async fn add_learner(mut req: Request<Arc<ExampleApp>>) -> tide::Result {
3333
let (node_id, api_addr, rpc_addr): (ExampleNodeId, String, String) = req.body_json().await?;
3434
let node = ExampleNode { rpc_addr, api_addr };
35-
let res = req.state().raft.add_learner(node_id, Some(node), true).await;
35+
let res = req.state().raft.add_learner(node_id, node, true).await;
3636
Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build())
3737
}
3838

examples/raft-kv-rocksdb/src/network/raft_network_impl.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
6868
async fn connect(
6969
&mut self,
7070
target: ExampleNodeId,
71-
node: Option<&ExampleNode>,
71+
node: &ExampleNode,
7272
) -> Result<Self::Network, Self::ConnectionError> {
7373
dbg!(&node);
74-
let addr = node.map(|x| format!("ws://{}", x.rpc_addr)).unwrap();
74+
let addr = format!("ws://{}", node.rpc_addr);
7575
let client = Client::dial_websocket(&addr).await.ok();
7676
Ok(ExampleNetworkConnection { addr, client, target })
7777
}

examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
9191
x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::<BTreeMap<_, _>>();
9292
assert_eq!(
9393
btreemap! {
94-
1 => Some(ExampleNode{rpc_addr: get_rpc_addr(1), api_addr: get_addr(1)}),
95-
2 => Some(ExampleNode{rpc_addr: get_rpc_addr(2), api_addr: get_addr(2)}),
96-
3 => Some(ExampleNode{rpc_addr: get_rpc_addr(3), api_addr: get_addr(3)}),
94+
1 => ExampleNode{rpc_addr: get_rpc_addr(1), api_addr: get_addr(1)},
95+
2 => ExampleNode{rpc_addr: get_rpc_addr(2), api_addr: get_addr(2)},
96+
3 => ExampleNode{rpc_addr: get_rpc_addr(3), api_addr: get_addr(3)},
9797
},
9898
nodes_in_cluster
9999
);

openraft/src/core/raft_core.rs

+10-13
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
380380
};
381381

382382
let my_id = self.id;
383-
let target_node = self.engine.state.membership_state.effective.get_node(&target).cloned();
384-
let mut network = match self.network.connect(target, target_node.as_ref()).await {
383+
let target_node = self.engine.state.membership_state.effective.get_node(&target).clone();
384+
let mut network = match self.network.connect(target, &target_node).await {
385385
Ok(n) => n,
386386
Err(e) => {
387387
tracing::error!(target = display(target), "{}", e);
@@ -479,7 +479,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
479479
pub(super) async fn add_learner(
480480
&mut self,
481481
target: C::NodeId,
482-
node: Option<C::Node>,
482+
node: C::Node,
483483
tx: RaftAddLearnerTx<C::NodeId, C::Node>,
484484
) -> Result<(), Fatal<C::NodeId>> {
485485
if let Some(l) = &self.leader_data {
@@ -517,7 +517,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
517517
}
518518

519519
// Ensure the node is connectable
520-
let conn_res = self.network.connect(target, node.clone().as_ref()).await;
520+
let conn_res = self.network.connect(target, &node).await;
521521
if let Err(e) = conn_res {
522522
let net_err = NetworkError::new(&anyerror::AnyError::new(&e));
523523
let _ = tx.send(Err(AddLearnerError::NetworkError(net_err)));
@@ -780,9 +780,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
780780
#[tracing::instrument(level = "debug", skip(self))]
781781
pub(crate) async fn handle_initialize(
782782
&mut self,
783-
member_nodes: BTreeMap<C::NodeId, Option<C::Node>>,
783+
member_nodes: BTreeMap<C::NodeId, C::Node>,
784784
) -> Result<(), InitializeError<C::NodeId, C::Node>> {
785-
let membership = Membership::try_from(member_nodes)?;
785+
let membership = Membership::from(member_nodes);
786786
let payload = EntryPayload::<C>::Membership(membership);
787787

788788
let mut entry_refs = [EntryRef::new(&payload)];
@@ -967,10 +967,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
967967
}
968968

969969
pub(crate) fn get_leader_node(&self, leader_id: Option<C::NodeId>) -> Option<C::Node> {
970-
match leader_id {
971-
None => None,
972-
Some(id) => self.engine.state.membership_state.effective.get_node(&id).cloned(),
973-
}
970+
leader_id.map(|id| self.engine.state.membership_state.effective.get_node(&id).clone())
974971
}
975972

976973
#[tracing::instrument(level = "debug", skip_all)]
@@ -1071,7 +1068,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
10711068

10721069
Ok(ReplicationCore::<C, N, S>::spawn(
10731070
target,
1074-
target_node.cloned(),
1071+
target_node.clone(),
10751072
self.engine.state.vote,
10761073
self.config.clone(),
10771074
self.engine.state.last_log_id(),
@@ -1259,8 +1256,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
12591256
}
12601257

12611258
let req = vote_req.clone();
1262-
let target_node = self.engine.state.membership_state.effective.get_node(&target).cloned();
1263-
let mut network = match self.network.connect(target, target_node.as_ref()).await {
1259+
let target_node = self.engine.state.membership_state.effective.get_node(&target).clone();
1260+
let mut network = match self.network.connect(target, &target_node).await {
12641261
Ok(n) => n,
12651262
Err(err) => {
12661263
tracing::error!({error=%err, target=display(target)}, "while requesting vote");

openraft/src/error.rs

+1
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ pub struct NotAllowed<NID: NodeId> {
466466
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
467467
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
468468
#[error("node {node_id} {reason}")]
469+
// TODO: remove it
469470
pub struct MissingNodeInfo<NID: NodeId> {
470471
pub node_id: NID,
471472
pub reason: String,

openraft/src/membership/effective_membership.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,12 @@ where
151151
}
152152

153153
/// Get a the node(either voter or learner) by node id.
154-
pub fn get_node(&self, node_id: &NID) -> Option<&N> {
154+
pub fn get_node(&self, node_id: &NID) -> &N {
155155
self.membership.get_node(node_id)
156156
}
157157

158158
/// Returns an Iterator of all nodes(voters and learners).
159-
pub fn nodes(&self) -> impl Iterator<Item = (&NID, &Option<N>)> {
159+
pub fn nodes(&self) -> impl Iterator<Item = (&NID, &N)> {
160160
self.membership.nodes()
161161
}
162162

openraft/src/membership/membership.rs

+19-51
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,21 @@ use crate::quorum::QuorumSet;
1414
use crate::MessageSummary;
1515
use crate::NodeId;
1616

17-
/// BTreeMap for mapping node-id the node.
18-
pub type NodeMap<NID, N> = BTreeMap<NID, Option<N>>;
1917
/// Convert other types into the internal data structure for node infos
2018
pub trait IntoOptionNodes<NID, N>
2119
where
2220
N: Node,
2321
NID: NodeId,
2422
{
25-
fn into_option_nodes(self) -> NodeMap<NID, N>;
23+
fn into_option_nodes(self) -> BTreeMap<NID, N>;
2624
}
2725

2826
impl<NID, N> IntoOptionNodes<NID, N> for ()
2927
where
3028
N: Node,
3129
NID: NodeId,
3230
{
33-
fn into_option_nodes(self) -> NodeMap<NID, N> {
31+
fn into_option_nodes(self) -> BTreeMap<NID, N> {
3432
btreemap! {}
3533
}
3634
}
@@ -40,8 +38,8 @@ where
4038
N: Node,
4139
NID: NodeId,
4240
{
43-
fn into_option_nodes(self) -> NodeMap<NID, N> {
44-
self.into_iter().map(|node_id| (node_id, None)).collect()
41+
fn into_option_nodes(self) -> BTreeMap<NID, N> {
42+
self.into_iter().map(|node_id| (node_id, N::default())).collect()
4543
}
4644
}
4745

@@ -50,17 +48,7 @@ where
5048
N: Node,
5149
NID: NodeId,
5250
{
53-
fn into_option_nodes(self) -> NodeMap<NID, N> {
54-
self.into_iter().map(|(node_id, n)| (node_id, Some(n))).collect()
55-
}
56-
}
57-
58-
impl<NID, N> IntoOptionNodes<NID, N> for NodeMap<NID, N>
59-
where
60-
N: Node,
61-
NID: NodeId,
62-
{
63-
fn into_option_nodes(self) -> NodeMap<NID, N> {
51+
fn into_option_nodes(self) -> BTreeMap<NID, N> {
6452
self
6553
}
6654
}
@@ -84,22 +72,19 @@ where
8472
/// Additional info of all nodes, e.g., the connecting host and port.
8573
///
8674
/// A node-id key that is in `nodes` but is not in `configs` is a **learner**.
87-
/// The values in this map must all be `Some` or `None`.
88-
nodes: BTreeMap<NID, Option<N>>,
75+
nodes: BTreeMap<NID, N>,
8976
}
9077

91-
impl<NID, N> TryFrom<BTreeMap<NID, Option<N>>> for Membership<NID, N>
78+
impl<NID, N> From<BTreeMap<NID, N>> for Membership<NID, N>
9279
where
9380
N: Node,
9481
NID: NodeId,
9582
{
96-
type Error = MissingNodeInfo<NID>;
97-
98-
fn try_from(b: BTreeMap<NID, Option<N>>) -> Result<Self, Self::Error> {
83+
fn from(b: BTreeMap<NID, N>) -> Self {
9984
let member_ids = b.keys().cloned().collect::<BTreeSet<NID>>();
10085

101-
let membership = Membership::with_nodes(vec![member_ids], b)?;
102-
Ok(membership)
86+
// Safe unwrap: every node-id in `member_ids` present in `b`.
87+
Membership::with_nodes(vec![member_ids], b).unwrap()
10388
}
10489
}
10590

@@ -122,9 +107,8 @@ where
122107
}
123108
res.push(format!("{}", node_id));
124109

125-
if let Some(n) = self.get_node(node_id) {
126-
res.push(format!(":{{{}}}", n));
127-
}
110+
let n = self.get_node(node_id);
111+
res.push(format!(":{{{}}}", n));
128112
}
129113
res.push("}".to_string());
130114
}
@@ -141,9 +125,8 @@ where
141125

142126
res.push(format!("{}", learner_id));
143127

144-
if let Some(n) = self.get_node(learner_id) {
145-
res.push(format!(":{{{}}}", n));
146-
}
128+
let n = self.get_node(learner_id);
129+
res.push(format!(":{{{}}}", n));
147130
}
148131
res.push("]".to_string());
149132
res.join("")
@@ -194,28 +177,14 @@ where
194177
}
195178
}
196179

197-
let has_some = nodes.values().any(|x| x.is_some());
198-
if has_some {
199-
let first_none = nodes.iter().find(|(_node_id, v)| v.is_none());
200-
if let Some(first_none) = first_none {
201-
return Err(MissingNodeInfo {
202-
node_id: *first_none.0,
203-
reason: "is None".to_string(),
204-
});
205-
}
206-
}
207-
208180
Ok(Membership { configs, nodes })
209181
}
210182

211183
/// Extends nodes btreemap with another.
212184
///
213185
/// Node that present in `old` will **NOT** be replaced because changing the address of a node potentially breaks
214186
/// consensus guarantee.
215-
pub(crate) fn extend_nodes(
216-
old: BTreeMap<NID, Option<N>>,
217-
new: &BTreeMap<NID, Option<N>>,
218-
) -> BTreeMap<NID, Option<N>> {
187+
pub(crate) fn extend_nodes(old: BTreeMap<NID, N>, new: &BTreeMap<NID, N>) -> BTreeMap<NID, N> {
219188
let mut res = old;
220189

221190
for (k, v) in new.iter() {
@@ -233,7 +202,7 @@ where
233202
self.configs.len() > 1
234203
}
235204

236-
pub(crate) fn add_learner(&self, node_id: NID, node: Option<N>) -> Result<Self, MissingNodeInfo<NID>> {
205+
pub(crate) fn add_learner(&self, node_id: NID, node: N) -> Result<Self, MissingNodeInfo<NID>> {
237206
let configs = self.configs.clone();
238207

239208
let nodes = Self::extend_nodes(self.nodes.clone(), &btreemap! {node_id=>node});
@@ -289,13 +258,12 @@ where
289258
}
290259

291260
/// Get a the node(either voter or learner) by node id.
292-
pub(crate) fn get_node(&self, node_id: &NID) -> Option<&N> {
293-
let x = self.nodes.get(node_id)?;
294-
x.as_ref()
261+
pub(crate) fn get_node(&self, node_id: &NID) -> &N {
262+
&self.nodes[node_id]
295263
}
296264

297265
/// Returns an Iterator of all nodes(voters and learners).
298-
pub fn nodes(&self) -> impl Iterator<Item = (&NID, &Option<N>)> {
266+
pub fn nodes(&self) -> impl Iterator<Item = (&NID, &N)> {
299267
self.nodes.iter()
300268
}
301269

0 commit comments

Comments
 (0)