Skip to content

Commit 8b59966

Browse files
committed
change: MembershipConfig.member type is changed form HashSet BTreeSet
1 parent 0d6f6de commit 8b59966

31 files changed

+198
-195
lines changed

async-raft/src/core/admin.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::BTreeSet;
12
use std::collections::HashSet;
23

34
use futures::future::FutureExt;
@@ -29,7 +30,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
2930
#[tracing::instrument(level = "trace", skip(self))]
3031
pub(super) async fn handle_init_with_config(
3132
&mut self,
32-
mut members: HashSet<NodeId>,
33+
mut members: BTreeSet<NodeId>,
3334
) -> Result<(), InitializeError> {
3435
if self.core.last_log_id.index != 0 || self.core.current_term != 0 {
3536
tracing::error!({self.core.last_log_id.index, self.core.current_term}, "rejecting init_with_config request as last_log_index or current_term is 0");
@@ -97,7 +98,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
9798
}
9899

99100
#[tracing::instrument(level = "trace", skip(self, tx))]
100-
pub(super) async fn change_membership(&mut self, members: HashSet<NodeId>, tx: ChangeMembershipTx) {
101+
pub(super) async fn change_membership(&mut self, members: BTreeSet<NodeId>, tx: ChangeMembershipTx) {
101102
// Ensure cluster will have at least one node.
102103
if members.is_empty() {
103104
let _ = tx.send(Err(ChangeConfigError::InoperableConfig));
@@ -219,7 +220,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
219220
.all_nodes()
220221
.into_iter()
221222
.filter(|elem| elem != &self.core.id)
222-
.collect::<HashSet<_>>();
223+
.collect::<BTreeSet<_>>();
223224

224225
let old_node_ids = self.core.membership.members.clone();
225226
let node_ids_to_add = new_node_ids.difference(&old_node_ids);

async-raft/src/core/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub(crate) mod replication;
88
mod vote;
99

1010
use std::collections::BTreeMap;
11+
use std::collections::BTreeSet;
1112
use std::collections::HashSet;
1213
use std::sync::Arc;
1314

@@ -778,7 +779,7 @@ pub enum ConsensusState {
778779
/// The set of non-voters nodes which are still being synced.
779780
awaiting: HashSet<NodeId>,
780781
/// The full membership change which has been proposed.
781-
members: HashSet<NodeId>,
782+
members: BTreeSet<NodeId>,
782783
/// The response channel to use once the consensus state is back into uniform state.
783784
tx: ChangeMembershipTx,
784785
},

async-raft/src/core/replication.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::HashSet;
1+
use std::collections::BTreeSet;
22

33
use tokio::sync::oneshot;
44

@@ -221,7 +221,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
221221
std::cmp::min(c0_index, c1_index)
222222
}
223223

224-
fn calc_members_commit_index(&self, mem: &HashSet<NodeId>, msg: &str) -> u64 {
224+
fn calc_members_commit_index(&self, mem: &BTreeSet<NodeId>, msg: &str) -> u64 {
225225
let log_ids = self.get_match_log_ids(mem);
226226
tracing::debug!("{} matched log_ids: {:?}", msg, log_ids);
227227

@@ -232,7 +232,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
232232
}
233233

234234
/// Extract the matching index/term of the replication state of specified nodes.
235-
fn get_match_log_ids(&self, node_ids: &HashSet<NodeId>) -> Vec<LogId> {
235+
fn get_match_log_ids(&self, node_ids: &BTreeSet<NodeId>) -> Vec<LogId> {
236236
tracing::debug!("to get match log ids of nodes: {:?}", node_ids);
237237

238238
let mut rst = Vec::with_capacity(node_ids.len());

async-raft/src/metrics.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
//! Metrics are observed on a running Raft node via the `Raft::metrics()` method, which will
88
//! return a stream of metrics.
99
10+
use std::collections::BTreeSet;
1011
use std::collections::HashMap;
11-
use std::collections::HashSet;
1212

1313
use serde::Deserialize;
1414
use serde::Serialize;
@@ -171,7 +171,7 @@ impl Wait {
171171

172172
/// Wait for `membership_config.members` to become expected node set or timeout.
173173
#[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))]
174-
pub async fn members(&self, want_members: HashSet<NodeId>, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
174+
pub async fn members(&self, want_members: BTreeSet<NodeId>, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
175175
self.metrics(
176176
|x| x.membership_config.members == want_members,
177177
&format!("{} .membership_config.members -> {:?}", msg.to_string(), want_members),
@@ -183,7 +183,7 @@ impl Wait {
183183
#[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))]
184184
pub async fn next_members(
185185
&self,
186-
want_members: Option<HashSet<NodeId>>,
186+
want_members: Option<BTreeSet<NodeId>>,
187187
msg: impl ToString,
188188
) -> Result<RaftMetrics, WaitError> {
189189
self.metrics(

async-raft/src/metrics_wait_test.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::time::Duration;
22

3-
use maplit::hashset;
3+
use maplit::btreeset;
44
use tokio::sync::watch;
55
use tokio::time::sleep;
66

@@ -73,14 +73,14 @@ async fn test_wait() -> anyhow::Result<()> {
7373
let h = tokio::spawn(async move {
7474
sleep(Duration::from_millis(10)).await;
7575
let mut update = init.clone();
76-
update.membership_config.members = hashset![1, 2];
76+
update.membership_config.members = btreeset![1, 2];
7777
let rst = tx.send(update);
7878
assert!(rst.is_ok());
7979
});
80-
let got = w.members(hashset![1, 2], "members").await?;
80+
let got = w.members(btreeset![1, 2], "members").await?;
8181
h.await?;
8282

83-
assert_eq!(hashset![1, 2], got.membership_config.members);
83+
assert_eq!(btreeset![1, 2], got.membership_config.members);
8484
}
8585

8686
{
@@ -90,14 +90,14 @@ async fn test_wait() -> anyhow::Result<()> {
9090
let h = tokio::spawn(async move {
9191
sleep(Duration::from_millis(10)).await;
9292
let mut update = init.clone();
93-
update.membership_config.members_after_consensus = Some(hashset![1, 2]);
93+
update.membership_config.members_after_consensus = Some(btreeset![1, 2]);
9494
let rst = tx.send(update);
9595
assert!(rst.is_ok());
9696
});
97-
let got = w.next_members(Some(hashset![1, 2]), "next_members").await?;
97+
let got = w.next_members(Some(btreeset![1, 2]), "next_members").await?;
9898
h.await?;
9999

100-
assert_eq!(Some(hashset![1, 2]), got.membership_config.members_after_consensus);
100+
assert_eq!(Some(btreeset![1, 2]), got.membership_config.members_after_consensus);
101101
}
102102

103103
tracing::info!("--- wait for snapshot, Ok");

async-raft/src/raft.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Public Raft interface and data types.
22
3-
use std::collections::HashSet;
3+
use std::collections::BTreeSet;
44
use std::sync::Arc;
55
use std::time::Duration;
66

@@ -214,7 +214,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
214214
/// free, and Raft guarantees that the first node to become the cluster leader will propagate
215215
/// only its own config.
216216
#[tracing::instrument(level = "debug", skip(self))]
217-
pub async fn initialize(&self, members: HashSet<NodeId>) -> Result<(), InitializeError> {
217+
pub async fn initialize(&self, members: BTreeSet<NodeId>) -> Result<(), InitializeError> {
218218
let (tx, rx) = oneshot::channel();
219219
self.inner.tx_api.send(RaftMsg::Initialize { members, tx }).map_err(|_| RaftError::ShuttingDown)?;
220220
rx.await.map_err(|_| InitializeError::RaftError(RaftError::ShuttingDown)).and_then(|res| res)
@@ -251,7 +251,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
251251
/// If this Raft node is not the cluster leader, then the proposed configuration change will be
252252
/// rejected.
253253
#[tracing::instrument(level = "debug", skip(self))]
254-
pub async fn change_membership(&self, members: HashSet<NodeId>) -> Result<(), ChangeConfigError> {
254+
pub async fn change_membership(&self, members: BTreeSet<NodeId>) -> Result<(), ChangeConfigError> {
255255
let (tx, rx) = oneshot::channel();
256256
self.inner
257257
.tx_api
@@ -339,15 +339,15 @@ pub(crate) enum RaftMsg<D: AppData, R: AppDataResponse> {
339339
tx: ClientReadResponseTx,
340340
},
341341
Initialize {
342-
members: HashSet<NodeId>,
342+
members: BTreeSet<NodeId>,
343343
tx: oneshot::Sender<Result<(), InitializeError>>,
344344
},
345345
AddNonVoter {
346346
id: NodeId,
347347
tx: ChangeMembershipTx,
348348
},
349349
ChangeMembership {
350-
members: HashSet<NodeId>,
350+
members: BTreeSet<NodeId>,
351351
tx: ChangeMembershipTx,
352352
},
353353
}
@@ -484,16 +484,16 @@ pub struct EntrySnapshotPointer {
484484
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
485485
pub struct MembershipConfig {
486486
/// All members of the Raft cluster.
487-
pub members: HashSet<NodeId>,
487+
pub members: BTreeSet<NodeId>,
488488
/// All members of the Raft cluster after joint consensus is finalized.
489489
///
490490
/// The presence of a value here indicates that the config is in joint consensus.
491-
pub members_after_consensus: Option<HashSet<NodeId>>,
491+
pub members_after_consensus: Option<BTreeSet<NodeId>>,
492492
}
493493

494494
impl MembershipConfig {
495495
/// Get an iterator over all nodes in the current config.
496-
pub fn all_nodes(&self) -> HashSet<u64> {
496+
pub fn all_nodes(&self) -> BTreeSet<u64> {
497497
let mut all = self.members.clone();
498498
if let Some(members) = &self.members_after_consensus {
499499
all.extend(members);
@@ -520,7 +520,7 @@ impl MembershipConfig {
520520

521521
/// Create a new initial config containing only the given node ID.
522522
pub fn new_initial(id: NodeId) -> Self {
523-
let mut members = HashSet::new();
523+
let mut members = BTreeSet::new();
524524
members.insert(id);
525525
Self {
526526
members,

async-raft/tests/add_remove_voter.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::HashSet;
1+
use std::collections::BTreeSet;
22
use std::sync::Arc;
33
use std::time::Duration;
44

@@ -7,7 +7,7 @@ use async_raft::Config;
77
use async_raft::State;
88
use fixtures::RaftRouter;
99
use futures::stream::StreamExt;
10-
use maplit::hashset;
10+
use maplit::btreeset;
1111

1212
mod fixtures;
1313

@@ -19,7 +19,7 @@ mod fixtures;
1919
/// - add 4 non-voter as follower.
2020
/// - asserts that the leader was able to successfully commit logs and that the followers has successfully replicated
2121
/// the payload.
22-
/// - remove one folower: node-4
22+
/// - remove one follower: node-4
2323
/// - asserts node-4 becomes non-voter and the leader stops sending logs to it.
2424
///
2525
/// RUST_LOG=async_raft,memstore,add_remove_voter=trace cargo test -p async-raft --test add_remove_voter
@@ -28,8 +28,8 @@ async fn add_remove_voter() -> Result<()> {
2828
fixtures::init_tracing();
2929

3030
let timeout = Duration::from_millis(500);
31-
let all_members = hashset![0, 1, 2, 3, 4];
32-
let left_members = hashset![0, 1, 2, 3];
31+
let all_members = btreeset![0, 1, 2, 3, 4];
32+
let left_members = btreeset![0, 1, 2, 3];
3333

3434
// Setup test dependencies.
3535
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
@@ -62,7 +62,7 @@ async fn add_remove_voter() -> Result<()> {
6262
router.initialize_from_single_node(0).await?;
6363
want = 1;
6464

65-
wait_log(router.clone(), &hashset![0], want).await?;
65+
wait_log(router.clone(), &btreeset![0], want).await?;
6666
router.assert_stable_cluster(Some(1), Some(want)).await;
6767

6868
// Sync some new nodes.
@@ -123,7 +123,7 @@ async fn add_remove_voter() -> Result<()> {
123123
Ok(())
124124
}
125125

126-
async fn wait_log(router: std::sync::Arc<fixtures::RaftRouter>, node_ids: &HashSet<u64>, want_log: u64) -> Result<()> {
126+
async fn wait_log(router: std::sync::Arc<fixtures::RaftRouter>, node_ids: &BTreeSet<u64>, want_log: u64) -> Result<()> {
127127
let timeout = Duration::from_millis(500);
128128
for i in node_ids.iter() {
129129
router

async-raft/tests/api_install_snapshot.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use async_raft::LogId;
99
use async_raft::SnapshotMeta;
1010
use async_raft::State;
1111
use fixtures::RaftRouter;
12-
use maplit::hashset;
12+
use maplit::btreeset;
1313

1414
/// API test: install_snapshot with various condition.
1515
///
@@ -33,13 +33,13 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
3333
{
3434
router.new_raft_node(0).await;
3535

36-
router.wait_for_log(&hashset![0], want, None, "empty").await?;
37-
router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?;
36+
router.wait_for_log(&btreeset![0], want, None, "empty").await?;
37+
router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?;
3838

3939
router.initialize_from_single_node(0).await?;
4040
want += 1;
4141

42-
router.wait_for_log(&hashset![0], want, None, "init leader").await?;
42+
router.wait_for_log(&btreeset![0], want, None, "init leader").await?;
4343
router.assert_stable_cluster(Some(1), Some(want)).await;
4444
}
4545

async-raft/tests/client_reads.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use anyhow::Result;
66
use async_raft::Config;
77
use async_raft::State;
88
use fixtures::RaftRouter;
9-
use maplit::hashset;
9+
use maplit::btreeset;
1010

1111
/// Client read tests.
1212
///
@@ -31,16 +31,16 @@ async fn client_reads() -> Result<()> {
3131
let mut want = 0;
3232

3333
// Assert all nodes are in non-voter state & have no entries.
34-
router.wait_for_log(&hashset![0, 1, 2], want, None, "empty node").await?;
35-
router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty node").await?;
34+
router.wait_for_log(&btreeset![0, 1, 2], want, None, "empty node").await?;
35+
router.wait_for_state(&btreeset![0, 1, 2], State::NonVoter, None, "empty node").await?;
3636
router.assert_pristine_cluster().await;
3737

3838
// Initialize the cluster, then assert that a stable cluster was formed & held.
3939
tracing::info!("--- initializing cluster");
4040
router.initialize_from_single_node(0).await?;
4141
want += 1;
4242

43-
router.wait_for_log(&hashset![0, 1, 2], want, None, "init leader").await?;
43+
router.wait_for_log(&btreeset![0, 1, 2], want, None, "init leader").await?;
4444
router.assert_stable_cluster(Some(1), Some(1)).await;
4545

4646
// Get the ID of the leader, and assert that client_read succeeds.

async-raft/tests/client_writes.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use async_raft::LogId;
77
use async_raft::State;
88
use fixtures::RaftRouter;
99
use futures::prelude::*;
10-
use maplit::hashset;
10+
use maplit::btreeset;
1111

1212
mod fixtures;
1313

@@ -34,17 +34,17 @@ async fn client_writes() -> Result<()> {
3434
let mut want = 0;
3535

3636
// Assert all nodes are in non-voter state & have no entries.
37-
router.wait_for_log(&hashset![0, 1, 2], want, None, "empty").await?;
38-
router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty").await?;
37+
router.wait_for_log(&btreeset![0, 1, 2], want, None, "empty").await?;
38+
router.wait_for_state(&btreeset![0, 1, 2], State::NonVoter, None, "empty").await?;
3939
router.assert_pristine_cluster().await;
4040

4141
// Initialize the cluster, then assert that a stable cluster was formed & held.
4242
tracing::info!("--- initializing cluster");
4343
router.initialize_from_single_node(0).await?;
4444
want += 1;
4545

46-
router.wait_for_log(&hashset![0, 1, 2], want, None, "leader init log").await?;
47-
router.wait_for_state(&hashset![0], State::Leader, None, "init").await?;
46+
router.wait_for_log(&btreeset![0, 1, 2], want, None, "leader init log").await?;
47+
router.wait_for_state(&btreeset![0], State::Leader, None, "init").await?;
4848

4949
router.assert_stable_cluster(Some(1), Some(want)).await;
5050

@@ -60,7 +60,7 @@ async fn client_writes() -> Result<()> {
6060
while clients.next().await.is_some() {}
6161

6262
want = 6001;
63-
router.wait_for_log(&hashset![0, 1, 2], want, None, "sync logs").await?;
63+
router.wait_for_log(&btreeset![0, 1, 2], want, None, "sync logs").await?;
6464

6565
router.assert_stable_cluster(Some(1), Some(want)).await; // The extra 1 is from the leader's initial commit entry.
6666
router
@@ -70,7 +70,7 @@ async fn client_writes() -> Result<()> {
7070
Some(0),
7171
LogId { term: 1, index: want },
7272
Some(((5000..5100).into(), 1, MembershipConfig {
73-
members: hashset![0, 1, 2],
73+
members: btreeset![0, 1, 2],
7474
members_after_consensus: None,
7575
})),
7676
)

0 commit comments

Comments
 (0)