Skip to content

Commit c508a35

Browse files
committed
Feature: Raft::client_write_ff() ff for fire-and-forget
`Raft<C>::client_write_ff() -> C::Responder::Receiver` submit a client request to Raft to update the state machine, returns an application defined response receiver `Responder::Receiver` to receive the response. `_ff` means fire and forget. It is same as [`Raft::client_write`] but does not wait for the response. When using this method, it is the application's responsibility for defining mechanism building and consuming the `Responder::Receiver`. - Part of #1068
1 parent 3b18517 commit c508a35

File tree

2 files changed

+52
-6
lines changed

2 files changed

+52
-6
lines changed

openraft/src/raft/mod.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -736,15 +736,29 @@ where C: RaftTypeConfig
736736
ResponderReceiverOf<C>: Future<Output = Result<WriteResult<C>, E>>,
737737
E: Error + OptionalSend,
738738
{
739-
let (app_data, tx, rx) = ResponderOf::<C>::from_app_data(app_data);
739+
let rx = self.client_write_ff(app_data).await?;
740740

741-
self.inner.send_msg(RaftMsg::ClientWriteRequest { app_data, tx }).await?;
742741
let res: WriteResult<C> = self.inner.recv_msg(rx).await?;
743742

744743
let client_write_response = res.map_err(RaftError::APIError)?;
745744
Ok(client_write_response)
746745
}
747746

747+
/// Submit a mutating client request to Raft to update the state machine, returns an application
748+
/// defined response receiver [`Responder::Receiver`].
749+
///
750+
/// `_ff` means fire and forget.
751+
///
752+
/// It is same as [`Raft::client_write`] but does not wait for the response.
753+
#[tracing::instrument(level = "debug", skip(self, app_data))]
754+
pub async fn client_write_ff(&self, app_data: C::D) -> Result<ResponderReceiverOf<C>, Fatal<C::NodeId>> {
755+
let (app_data, tx, rx) = ResponderOf::<C>::from_app_data(app_data);
756+
757+
self.inner.send_msg(RaftMsg::ClientWriteRequest { app_data, tx }).await?;
758+
759+
Ok(rx)
760+
}
761+
748762
/// Return `true` if this node is already initialized and can not be initialized again with
749763
/// [`Raft::initialize`]
750764
pub async fn is_initialized(&self) -> Result<bool, Fatal<C::NodeId>> {

tests/tests/client_api/t10_client_writes.rs

+36-4
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ use std::sync::Arc;
33
use anyhow::Result;
44
use futures::prelude::*;
55
use maplit::btreeset;
6+
use openraft::raft::ClientWriteResponse;
67
use openraft::CommittedLeaderId;
78
use openraft::Config;
89
use openraft::LogId;
910
use openraft::SnapshotPolicy;
11+
use openraft_memstore::ClientRequest;
12+
use openraft_memstore::IntoMemClientRequest;
13+
use openraft_memstore::TypeConfig;
1014

1115
use crate::fixtures::init_default_ut_tracing;
1216
use crate::fixtures::RaftRouter;
1317

14-
/// Client write tests.
15-
///
16-
/// What does this test do?
17-
///
1818
/// - create a stable 3-node cluster.
1919
/// - write a lot of data to it.
2020
/// - assert that the cluster stayed stable and has all of the expected data.
@@ -63,3 +63,35 @@ async fn client_writes() -> Result<()> {
6363

6464
Ok(())
6565
}
66+
67+
/// Test Raft::client_write_ff,
68+
///
69+
/// Manually receive the client-write response via the returned `Responder::Receiver`
70+
#[async_entry::test(worker_threads = 4, init = "init_default_ut_tracing()", tracing_span = "debug")]
71+
async fn client_write_ff() -> Result<()> {
72+
let config = Arc::new(
73+
Config {
74+
enable_tick: false,
75+
..Default::default()
76+
}
77+
.validate()?,
78+
);
79+
80+
let mut router = RaftRouter::new(config.clone());
81+
82+
tracing::info!("--- initializing cluster");
83+
let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?;
84+
let _ = log_index;
85+
86+
let n0 = router.get_raft_handle(&0)?;
87+
88+
let resp_rx = n0.client_write_ff(ClientRequest::make_request("foo", 2)).await?;
89+
let got: ClientWriteResponse<TypeConfig> = resp_rx.await??;
90+
assert_eq!(None, got.response().0.as_deref());
91+
92+
let resp_rx = n0.client_write_ff(ClientRequest::make_request("foo", 3)).await?;
93+
let got: ClientWriteResponse<TypeConfig> = resp_rx.await??;
94+
assert_eq!(Some("request-2"), got.response().0.as_deref());
95+
96+
Ok(())
97+
}

0 commit comments

Comments
 (0)