Skip to content

Commit 1f3bf20

Browse files
committed
Improve: create a channel notify specifically for interal messages
`tx_notify` will be used for components such as state-machine worker or replication stream to send back notification when an action is done. `tx_api` is left for receiving only external messages, such as append-entries request or client-write request. A `Balancer` is added to prevent one channel from starving the others. The benchmark shows a better performance with 64 clients: | clients | put/s | ns/op | Changes | | --: | --: | --: | :-- | | 64 | **730,000** | 1,369 | This commit | | 64 | **652,000** | 1,532 | Previous commit |
1 parent 168db20 commit 1f3bf20

File tree

9 files changed

+305
-173
lines changed

9 files changed

+305
-173
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ Benchmark history:
9595

9696
| Date | clients | put/s | ns/op | Changes |
9797
| :-- | --: | --: | --: | :-- |
98+
| 2023-04-25 | 64 | **730,000** | 1,369 | Split channels |
9899
| 2023-04-24 | 64 | **652,000** | 1,532 | Reduce metrics report rate |
99100
| 2023-04-23 | 64 | **467,000** | 2,139 | State-machine moved to separate task |
100101
| | 1 | 70,000 | **14,273** | |

openraft/src/core/balancer.rs

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
//! This mod defines and manipulates the ratio of multiple messages to handle.
2+
//!
3+
//! The ratio should be adjust so that every channel won't be starved.
4+
5+
/// Balance the ratio of different kind of message to handle.
6+
pub(crate) struct Balancer {
7+
total: u64,
8+
9+
/// The number of RaftMsg to handle in each round.
10+
raft_msg: u64,
11+
}
12+
13+
impl Balancer {
14+
pub(crate) fn new(total: u64) -> Self {
15+
Self {
16+
total,
17+
// RaftMsg is the input entry.
18+
// We should consume as many as internal messages as possible.
19+
raft_msg: total / 10,
20+
}
21+
}
22+
23+
pub(crate) fn raft_msg(&self) -> u64 {
24+
self.raft_msg
25+
}
26+
27+
pub(crate) fn notify(&self) -> u64 {
28+
self.total - self.raft_msg
29+
}
30+
31+
pub(crate) fn increase_notify(&mut self) {
32+
self.raft_msg = self.raft_msg * 15 / 16;
33+
if self.raft_msg == 0 {
34+
self.raft_msg = 1;
35+
}
36+
}
37+
38+
pub(crate) fn increase_raft_msg(&mut self) {
39+
self.raft_msg = self.raft_msg * 17 / 16;
40+
41+
// Always leave some budget for other channels
42+
if self.raft_msg > self.total / 2 {
43+
self.raft_msg = self.total / 2;
44+
}
45+
}
46+
}

openraft/src/core/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
//! Also it receives and execute `Command` emitted by `Engine` to apply raft state to underlying
55
//! storage or forward messages to other raft nodes.
66
7+
pub(crate) mod balancer;
8+
pub(crate) mod notify;
79
mod raft_core;
810
mod replication_state;
911
mod server_state;

openraft/src/core/notify.rs

+113
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use crate::core::sm;
2+
use crate::raft::VoteResponse;
3+
use crate::replication::ReplicationResult;
4+
use crate::replication::ReplicationSessionId;
5+
use crate::MessageSummary;
6+
use crate::RaftTypeConfig;
7+
use crate::StorageError;
8+
use crate::Vote;
9+
10+
/// A message coming from the internal components.
11+
pub(crate) enum Notify<C>
12+
where C: RaftTypeConfig
13+
{
14+
VoteResponse {
15+
target: C::NodeId,
16+
resp: VoteResponse<C::NodeId>,
17+
18+
/// Which ServerState sent this message. It is also the requested vote.
19+
vote: Vote<C::NodeId>,
20+
},
21+
22+
/// A tick event to wake up RaftCore to check timeout etc.
23+
Tick {
24+
/// ith tick
25+
i: u64,
26+
},
27+
28+
// /// Logs that are submitted to append has been persisted to disk.
29+
// LogPersisted {},
30+
/// Update the `matched` log id of a replication target.
31+
/// Sent by a replication task `ReplicationCore`.
32+
UpdateReplicationProgress {
33+
/// The ID of the target node for which the match index is to be updated.
34+
target: C::NodeId,
35+
36+
/// The id of the subject that submit this replication action.
37+
///
38+
/// It is only used for debugging purpose.
39+
id: u64,
40+
41+
/// Either the last log id that has been successfully replicated to the target,
42+
/// or an error in string.
43+
result: Result<ReplicationResult<C::NodeId>, String>,
44+
45+
/// In which session this message is sent.
46+
/// A replication session(vote,membership_log_id) should ignore message from other session.
47+
session_id: ReplicationSessionId<C::NodeId>,
48+
},
49+
50+
/// [`StorageError`] error has taken place locally(not on remote node) when replicating, and
51+
/// [`RaftCore`] needs to shutdown. Sent by a replication task
52+
/// [`crate::replication::ReplicationCore`].
53+
ReplicationStorageError { error: StorageError<C::NodeId> },
54+
55+
/// ReplicationCore has seen a higher `vote`.
56+
/// Sent by a replication task `ReplicationCore`.
57+
HigherVote {
58+
/// The ID of the target node from which the new term was observed.
59+
target: C::NodeId,
60+
61+
/// The higher vote observed.
62+
higher: Vote<C::NodeId>,
63+
64+
/// Which ServerState sent this message
65+
vote: Vote<C::NodeId>,
66+
// TODO: need this?
67+
// /// The cluster this replication works for.
68+
// membership_log_id: Option<LogId<C::NodeId>>,
69+
},
70+
71+
/// Result of executing a command sent from state machine worker.
72+
StateMachine { command_result: sm::CommandResult<C> },
73+
}
74+
75+
impl<C> MessageSummary<Notify<C>> for Notify<C>
76+
where C: RaftTypeConfig
77+
{
78+
fn summary(&self) -> String {
79+
match self {
80+
Notify::VoteResponse { target, resp, vote } => {
81+
format!("VoteResponse: from: {}: {}, res-vote: {}", target, resp.summary(), vote)
82+
}
83+
Notify::Tick { i } => {
84+
format!("Tick {}", i)
85+
}
86+
Notify::UpdateReplicationProgress {
87+
ref target,
88+
ref id,
89+
ref result,
90+
ref session_id,
91+
} => {
92+
format!(
93+
"UpdateReplicationProgress: target: {}, id: {}, result: {:?}, session_id: {}",
94+
target, id, result, session_id,
95+
)
96+
}
97+
Notify::HigherVote {
98+
ref target,
99+
higher: ref new_vote,
100+
ref vote,
101+
} => {
102+
format!(
103+
"Seen a higher vote: target: {}, vote: {}, server_state_vote: {}",
104+
target, new_vote, vote
105+
)
106+
}
107+
Notify::ReplicationStorageError { error } => format!("ReplicationFatal: {}", error),
108+
Notify::StateMachine { command_result: done } => {
109+
format!("StateMachine command done: {:?}", done)
110+
}
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)