Skip to content

Commit dd85847

Browse files
committed
chore: remove unused leader_tx broadcast channel
Signed-off-by: bsbds <[email protected]>
1 parent daa17b3 commit dd85847

File tree

3 files changed

+1
-36
lines changed

3 files changed

+1
-36
lines changed

crates/curp/src/server/curp_node/mod.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@ use futures::{pin_mut, stream::FuturesUnordered, Stream, StreamExt};
1212
use madsim::rand::{thread_rng, Rng};
1313
use opentelemetry::KeyValue;
1414
use parking_lot::{Mutex, RwLock};
15-
use tokio::{
16-
sync::{broadcast, oneshot},
17-
time::MissedTickBehavior,
18-
};
15+
use tokio::{sync::oneshot, time::MissedTickBehavior};
1916
#[cfg(not(madsim))]
2017
use tonic::transport::ClientTlsConfig;
2118
use tracing::{debug, error, info, trace, warn};
@@ -41,7 +38,6 @@ use crate::{
4138
cmd::{Command, CommandExecutor},
4239
log_entry::{EntryData, LogEntry},
4340
member::{Membership, MembershipInfo},
44-
members::ServerId,
4541
response::ResponseSender,
4642
role_change::RoleChange,
4743
rpc::{
@@ -967,11 +963,6 @@ impl<C: Command, CE: CommandExecutor<C>, RC: RoleChange> CurpNode<C, CE, RC> {
967963
None
968964
}
969965

970-
/// Get a rx for leader changes
971-
pub(super) fn leader_rx(&self) -> broadcast::Receiver<Option<ServerId>> {
972-
self.curp.leader_rx()
973-
}
974-
975966
/// Send `append_entries` request
976967
/// Return `tonic::Error` if meet network issue
977968
/// Return (`leader_retires`, `ae_succeed`)

crates/curp/src/server/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::sync::Arc;
44
use engine::SnapshotAllocator;
55
use flume::r#async::RecvStream;
66
use futures::{Stream, StreamExt};
7-
use tokio::sync::broadcast;
87
#[cfg(not(madsim))]
98
use tonic::transport::ClientTlsConfig;
109
use tracing::instrument;
@@ -21,7 +20,6 @@ pub use self::raw_curp::RawCurp;
2120
use crate::cmd::Command;
2221
use crate::cmd::CommandExecutor;
2322
use crate::member::MembershipInfo;
24-
use crate::members::ServerId;
2523
use crate::response::ResponseSender;
2624
use crate::role_change::RoleChange;
2725
use crate::rpc::connect::Bypass;
@@ -431,13 +429,6 @@ impl<C: Command, CE: CommandExecutor<C>, RC: RoleChange> Rpc<C, CE, RC> {
431429
Ok(())
432430
}
433431

434-
/// Get a subscriber for leader changes
435-
#[inline]
436-
#[must_use]
437-
pub fn leader_rx(&self) -> broadcast::Receiver<Option<ServerId>> {
438-
self.inner.leader_rx()
439-
}
440-
441432
/// Get raw curp
442433
#[inline]
443434
#[must_use]

crates/curp/src/server/raw_curp/mod.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use parking_lot::Mutex;
3333
use parking_lot::RwLock;
3434
use parking_lot::RwLockUpgradableReadGuard;
3535
use parking_lot::RwLockWriteGuard;
36-
use tokio::sync::broadcast;
3736
use tokio::sync::oneshot;
3837
#[cfg(not(madsim))]
3938
use tonic::transport::ClientTlsConfig;
@@ -334,9 +333,6 @@ struct Context<C: Command, RC: RoleChange> {
334333
cb: CmdBoardRef<C>,
335334
/// The lease manager
336335
lm: LeaseManagerRef,
337-
/// Tx to send leader changes
338-
#[builder(setter(skip))]
339-
leader_tx: broadcast::Sender<Option<ServerId>>,
340336
/// Election tick
341337
#[builder(setter(skip))]
342338
election_tick: AtomicU8,
@@ -389,7 +385,6 @@ impl<C: Command, RC: RoleChange> ContextBuilder<C, RC> {
389385
Some(value) => value,
390386
None => return Err(ContextBuilderError::UninitializedField("lm")),
391387
},
392-
leader_tx: broadcast::channel(1).0,
393388
election_tick: AtomicU8::new(0),
394389
sync_events: match self.sync_events.take() {
395390
Some(value) => value,
@@ -445,7 +440,6 @@ impl<C: Command, RC: RoleChange> Debug for Context<C, RC> {
445440
f.debug_struct("Context")
446441
.field("cfg", &self.cfg)
447442
.field("cb", &self.cb)
448-
.field("leader_tx", &self.leader_tx)
449443
.field("election_tick", &self.election_tick)
450444
.field("cmd_tx", &"CEEventTxApi")
451445
.field("sync_events", &self.sync_events)
@@ -722,13 +716,11 @@ impl<C: Command, RC: RoleChange> RawCurp<C, RC> {
722716
let mut st_w = RwLockUpgradableReadGuard::upgrade(st_r);
723717
self.update_to_term_and_become_follower(&mut st_w, term);
724718
st_w.leader_id = Some(leader_id);
725-
let _ig = self.ctx.leader_tx.send(Some(leader_id)).ok();
726719
}
727720
std::cmp::Ordering::Equal => {
728721
if st_r.leader_id.is_none() {
729722
let mut st_w = RwLockUpgradableReadGuard::upgrade(st_r);
730723
st_w.leader_id = Some(leader_id);
731-
let _ig = self.ctx.leader_tx.send(Some(leader_id)).ok();
732724
}
733725
}
734726
std::cmp::Ordering::Greater => {
@@ -1204,11 +1196,6 @@ impl<C: Command, RC: RoleChange> RawCurp<C, RC> {
12041196
self.ms.read().node_id()
12051197
}
12061198

1207-
/// Get a rx for leader changes
1208-
pub(super) fn leader_rx(&self) -> broadcast::Receiver<Option<ServerId>> {
1209-
self.ctx.leader_tx.subscribe()
1210-
}
1211-
12121199
/// Get the effective membership
12131200
pub(super) fn effective_membership(&self) -> Membership {
12141201
self.ms.read().cluster().effective().clone()
@@ -1436,7 +1423,6 @@ impl<C: Command, RC: RoleChange> RawCurp<C, RC> {
14361423
st.role = Role::PreCandidate;
14371424
cst.votes_received = HashMap::from([(self.id(), true)]);
14381425
st.leader_id = None;
1439-
let _ig = self.ctx.leader_tx.send(None).ok();
14401426
self.reset_election_tick();
14411427

14421428
if prev_role == Role::Follower {
@@ -1479,7 +1465,6 @@ impl<C: Command, RC: RoleChange> RawCurp<C, RC> {
14791465
st.role = Role::Candidate;
14801466
st.voted_for = Some(self.id());
14811467
st.leader_id = None;
1482-
let _ig = self.ctx.leader_tx.send(None).ok();
14831468
self.reset_election_tick();
14841469

14851470
let self_sp = self.ctx.spec_pool.map_lock(|sp| sp.all());
@@ -1522,7 +1507,6 @@ impl<C: Command, RC: RoleChange> RawCurp<C, RC> {
15221507
metrics::get().leader_changes.add(1, &[]);
15231508
st.role = Role::Leader;
15241509
st.leader_id = Some(self.id());
1525-
let _ig = self.ctx.leader_tx.send(Some(self.id())).ok();
15261510
let _ignore = self.ctx.leader_event.notify(usize::MAX);
15271511
self.ctx.role_change.on_election_win();
15281512
debug!("{} becomes the leader", self.id());
@@ -1548,7 +1532,6 @@ impl<C: Command, RC: RoleChange> RawCurp<C, RC> {
15481532
st.role = Role::Follower;
15491533
st.voted_for = None;
15501534
st.leader_id = None;
1551-
let _ig = self.ctx.leader_tx.send(None).ok();
15521535
st.randomize_timeout_ticks(); // regenerate timeout ticks
15531536
debug!(
15541537
"{} updates to term {term} and becomes a follower",

0 commit comments

Comments
 (0)