Skip to content

Commit cccddcc

Browse files
committed
Refactor: remove ClientRequestEntry;
It does not need to cache the entry, only the response tx. Log entry can be fetched from storage by index. Only tx channels for sending back result to client are stored and are indexed by log-index.
1 parent 6f20e1f commit cccddcc

File tree

4 files changed

+28
-71
lines changed

4 files changed

+28
-71
lines changed

openraft/src/core/admin.rs

+1-8
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
use std::collections::BTreeMap;
22
use std::collections::BTreeSet;
33
use std::option::Option::None;
4-
use std::sync::Arc;
54

65
use tracing::warn;
76

8-
use crate::core::client::ClientRequestEntry;
97
use crate::core::LeaderState;
108
use crate::core::LearnerState;
119
use crate::core::State;
@@ -287,12 +285,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
287285

288286
self.core.metrics_flags.set_data_changed();
289287

290-
let cr_entry = ClientRequestEntry {
291-
entry: Arc::new(entry),
292-
tx: resp_tx,
293-
};
294-
295-
self.replicate_client_request(cr_entry).await?;
288+
self.replicate_client_request(entry, resp_tx).await?;
296289

297290
Ok(())
298291
}

openraft/src/core/client.rs

+18-44
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::sync::Arc;
2-
31
use futures::future::TryFutureExt;
42
use futures::stream::FuturesUnordered;
53
use futures::stream::StreamExt;
@@ -32,24 +30,6 @@ use crate::RaftStorage;
3230
use crate::RaftTypeConfig;
3331
use crate::StorageError;
3432

35-
/// A wrapper around a ClientRequest which has been transformed into an Entry, along with its response channel.
36-
pub(super) struct ClientRequestEntry<C: RaftTypeConfig> {
37-
/// The Arc'd entry of the ClientRequest.
38-
///
39-
/// This value is Arc'd so that it may be sent across thread boundaries for replication
40-
/// without having to clone the data payload itself.
41-
pub entry: Arc<Entry<C>>,
42-
43-
/// The response channel for the request.
44-
pub tx: Option<RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId>>>,
45-
}
46-
47-
impl<C: RaftTypeConfig> MessageSummary for ClientRequestEntry<C> {
48-
fn summary(&self) -> String {
49-
format!("entry:{}", self.entry.summary())
50-
}
51-
}
52-
5333
impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderState<'a, C, N, S> {
5434
/// Commit the initial entry which new leaders are obligated to create when first coming to power, per §8.
5535
#[tracing::instrument(level = "trace", skip(self))]
@@ -58,12 +38,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
5838

5939
self.core.metrics_flags.set_data_changed();
6040

61-
let cr_entry = ClientRequestEntry {
62-
entry: Arc::new(entry),
63-
tx: None,
64-
};
65-
66-
self.replicate_client_request(cr_entry).await?;
41+
self.replicate_client_request(entry, None).await?;
6742

6843
Ok(())
6944
}
@@ -194,14 +169,10 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
194169
tx: RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId>>,
195170
) -> Result<(), StorageError<C::NodeId>> {
196171
let entry = self.core.append_payload_to_log(rpc.payload).await?;
197-
let entry = ClientRequestEntry {
198-
entry: Arc::new(entry),
199-
tx: Some(tx),
200-
};
201172

202173
self.core.metrics_flags.set_data_changed();
203174

204-
self.replicate_client_request(entry).await?;
175+
self.replicate_client_request(entry, Some(tx)).await?;
205176
Ok(())
206177
}
207178

@@ -210,15 +181,20 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
210181
/// NOTE WELL: this routine does not wait for the request to actually finish replication, it
211182
/// merely beings the process. Once the request is committed to the cluster, its response will
212183
/// be generated asynchronously.
213-
#[tracing::instrument(level = "debug", skip(self, req), fields(req=%req.summary()))]
184+
#[tracing::instrument(level = "debug", skip(self, entry, resp_tx), fields(req=%entry.summary()))]
214185
pub(super) async fn replicate_client_request(
215186
&mut self,
216-
req: ClientRequestEntry<C>,
187+
entry: Entry<C>,
188+
resp_tx: Option<RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId>>>,
217189
) -> Result<(), StorageError<C::NodeId>> {
218190
// Replicate the request if there are other cluster members. The client response will be
219191
// returned elsewhere after the entry has been committed to the cluster.
220192

221-
let log_id = req.entry.log_id;
193+
let log_id = entry.log_id;
194+
if let Some(tx) = resp_tx {
195+
self.client_resp_channels.insert(log_id.index, tx);
196+
}
197+
222198
let quorum_granted = self.core.effective_membership.membership.is_majority(&btreeset! {self.core.id});
223199

224200
if quorum_granted {
@@ -228,9 +204,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
228204
tracing::debug!(?self.core.committed, "update committed, no need to replicate");
229205

230206
self.core.metrics_flags.set_data_changed();
231-
self.client_request_post_commit(req).await?;
232-
} else {
233-
self.awaiting_committed.push(req);
207+
self.client_request_post_commit(log_id.index).await?;
234208
}
235209

236210
for node in self.nodes.values() {
@@ -247,16 +221,16 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
247221
}
248222

249223
/// Handle the post-commit logic for a client request.
250-
#[tracing::instrument(level = "debug", skip(self, req))]
251-
pub(super) async fn client_request_post_commit(
252-
&mut self,
253-
req: ClientRequestEntry<C>,
254-
) -> Result<(), StorageError<C::NodeId>> {
255-
let entry = &req.entry;
224+
#[tracing::instrument(level = "debug", skip(self))]
225+
pub(super) async fn client_request_post_commit(&mut self, log_index: u64) -> Result<(), StorageError<C::NodeId>> {
226+
let entries = self.core.storage.get_log_entries(log_index..=log_index).await?;
227+
let entry = &entries[0];
228+
229+
let tx = self.client_resp_channels.remove(&log_index);
256230

257231
let apply_res = self.apply_entry_to_state_machine(entry).await?;
258232

259-
self.send_response(entry, apply_res, req.tx).await;
233+
self.send_response(entry, apply_res, tx).await;
260234

261235
// Trigger log compaction if needed.
262236
self.core.trigger_log_compaction_if_needed(false).await;

openraft/src/core/mod.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use tracing::Span;
3535

3636
use crate::config::Config;
3737
use crate::config::SnapshotPolicy;
38-
use crate::core::client::ClientRequestEntry;
3938
use crate::error::AddLearnerError;
39+
use crate::error::ClientWriteError;
4040
use crate::error::ExtractFatal;
4141
use crate::error::Fatal;
4242
use crate::error::ForwardToLeader;
@@ -45,6 +45,7 @@ use crate::error::NotAllowed;
4545
use crate::metrics::RaftMetrics;
4646
use crate::metrics::ReplicationMetrics;
4747
use crate::raft::AddLearnerResponse;
48+
use crate::raft::ClientWriteResponse;
4849
use crate::raft::RaftMsg;
4950
use crate::raft::RaftRespTx;
5051
use crate::raft::VoteResponse;
@@ -797,8 +798,8 @@ struct LeaderState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStora
797798
/// The cloneable sender channel for replication stream events.
798799
pub(super) replication_tx: mpsc::UnboundedSender<(ReplicaEvent<C, S::SnapshotData>, Span)>,
799800

800-
/// A buffer of client requests which have been appended locally and are awaiting to be committed to the cluster.
801-
pub(super) awaiting_committed: Vec<ClientRequestEntry<C>>,
801+
/// Channels to send result back to client when logs are committed.
802+
pub(super) client_resp_channels: BTreeMap<u64, RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId>>>,
802803
}
803804

804805
impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsProvider<C::NodeId>
@@ -819,7 +820,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
819820
replication_metrics: Versioned::new(ReplicationMetrics::default()),
820821
replication_tx,
821822
replication_rx,
822-
awaiting_committed: Vec::new(),
823+
client_resp_channels: Default::default(),
823824
}
824825
}
825826

openraft/src/core/replication.rs

+4-15
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::summary::MessageSummary;
2020
use crate::versioned::Updatable;
2121
use crate::vote::Vote;
2222
use crate::LogId;
23+
use crate::LogIdOptionExt;
2324
use crate::RaftNetworkFactory;
2425
use crate::RaftStorage;
2526
use crate::RaftTypeConfig;
@@ -156,21 +157,9 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
156157
));
157158
}
158159

159-
// Check if there are any pending requests which need to be processed.
160-
let filter = self
161-
.awaiting_committed
162-
.iter()
163-
.enumerate()
164-
.take_while(|(_idx, elem)| Some(elem.entry.log_id) <= self.core.committed)
165-
.last()
166-
.map(|(idx, _)| idx);
167-
168-
if let Some(offset) = filter {
169-
// Build a new ApplyLogsTask from each of the given client requests.
170-
171-
for request in self.awaiting_committed.drain(..=offset).collect::<Vec<_>>() {
172-
self.client_request_post_commit(request).await?;
173-
}
160+
// Apply committed entries, and send applying result to client if there is a channel awaiting it
161+
for i in self.core.last_applied.next_index()..self.core.committed.next_index() {
162+
self.client_request_post_commit(i).await?;
174163
}
175164

176165
self.core.metrics_flags.set_data_changed();

0 commit comments

Comments
 (0)