Skip to content

Commit 4d0918f

Browse files
committed
Feature: Add rocks based example
Signed-off-by: Heinz N. Gies <[email protected]>
1 parent 401d427 commit 4d0918f

17 files changed

+1752
-7
lines changed

Cargo.toml

+2-7
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
11
[workspace]
2-
members = [
3-
"openraft",
4-
"memstore",
5-
]
6-
exclude = [
7-
"examples/raft-kv-memstore",
8-
]
2+
members = ["openraft", "memstore"]
3+
exclude = ["examples/raft-kv-memstore", "examples/raft-kv-rocksdb"]

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ defensive_test:
66
test: lint fmt
77
cargo test
88
cargo test --manifest-path examples/raft-kv-memstore/Cargo.toml
9+
cargo test --manifest-path examples/raft-kv-rocksdb/Cargo.toml
910

1011
bench_cluster_of_1:
1112
cargo test --package openraft --test benchmark --release bench_cluster_of_1 -- --ignored --nocapture

examples/raft-kv-rocksdb/.gitignore

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
target
2+
vendor
3+
.idea
4+
*.db
5+
*.log

examples/raft-kv-rocksdb/Cargo.toml

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
[package]
2+
name = "raft-key-value-rocks"
3+
version = "0.1.0"
4+
edition = "2021"
5+
authors = [
6+
"drdr xp <[email protected]>",
7+
"Pedro Paulo de Amorim <[email protected]>",
8+
"The Tremor Team",
9+
]
10+
categories = ["algorithms", "asynchronous", "data-structures"]
11+
description = "An example distributed key-value store built upon `openraft`."
12+
homepage = "https://github.com/datafuselabs/openraft"
13+
keywords = ["raft", "consensus"]
14+
license = "MIT/Apache-2.0"
15+
repository = "https://github.com/datafuselabs/openraft"
16+
readme = "README.md"
17+
18+
[[bin]]
19+
name = "raft-key-value-rocks"
20+
path = "src/bin/main.rs"
21+
22+
[dependencies]
23+
rocksdb = "0.18.0"
24+
tide = { version = "0.16" }
25+
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
26+
async-trait = "0.1.36"
27+
clap = { version = "3.0.13", features = ["derive", "env"] }
28+
tracing-subscriber = "0.3.0"
29+
openraft = { version = "0.6", path = "../../openraft", features = ["serde"] }
30+
reqwest = { version = "0.11.9", features = ["json"] }
31+
serde = { version = "1.0.114", features = ["derive"] }
32+
serde_json = "1.0.57"
33+
tracing = "0.1.29"
34+
tracing-futures = "0.2.4"
35+
toy-rpc = { version = "*", features = [
36+
"ws_async_std",
37+
"server",
38+
"client",
39+
"async_std_runtime",
40+
] }
41+
byteorder = "1.4.3"
42+
43+
[dev-dependencies]
44+
maplit = "1.0.2"
45+
tempdir = "0.3.0"
46+
47+
[features]
48+
docinclude = [] # Used only for activating `doc(include="...")` on nightly.
49+
50+
[package.metadata.docs.rs]
51+
features = ["docinclude"] # Activate `docinclude` during docs.rs build.

examples/raft-kv-rocksdb/src/app.rs

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use std::sync::Arc;
2+
3+
use openraft::Config;
4+
5+
use crate::ExampleNodeId;
6+
use crate::ExampleRaft;
7+
use crate::ExampleStore;
8+
9+
// Representation of an application state. This struct can be shared around to share
10+
// instances of raft, store and more.
11+
pub struct ExampleApp {
12+
pub id: ExampleNodeId,
13+
pub api_addr: String,
14+
pub rcp_addr: String,
15+
pub raft: ExampleRaft,
16+
pub store: Arc<ExampleStore>,
17+
pub config: Arc<Config>,
18+
}
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use clap::Parser;
2+
use openraft::Raft;
3+
use raft_key_value_rocks::network::raft_network_impl::ExampleNetwork;
4+
use raft_key_value_rocks::start_example_raft_node;
5+
use raft_key_value_rocks::store::ExampleStore;
6+
use raft_key_value_rocks::ExampleTypeConfig;
7+
8+
pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, ExampleStore>;
9+
10+
#[derive(Parser, Clone, Debug)]
11+
#[clap(author, version, about, long_about = None)]
12+
pub struct Opt {
13+
#[clap(long)]
14+
pub id: u64,
15+
16+
#[clap(long)]
17+
pub http_addr: String,
18+
19+
#[clap(long)]
20+
pub rpc_addr: String,
21+
}
22+
23+
#[async_std::main]
24+
async fn main() -> std::io::Result<()> {
25+
// Setup the logger
26+
tracing_subscriber::fmt().init();
27+
28+
// Parse the parameters passed by arguments.
29+
let options = Opt::parse();
30+
31+
start_example_raft_node(
32+
options.id,
33+
format!("{}.db", options.rpc_addr),
34+
options.http_addr,
35+
options.rpc_addr,
36+
)
37+
.await
38+
}
+221
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
use std::collections::BTreeSet;
2+
use std::sync::Arc;
3+
use std::sync::Mutex;
4+
5+
use openraft::error::AddLearnerError;
6+
use openraft::error::CheckIsLeaderError;
7+
use openraft::error::ClientWriteError;
8+
use openraft::error::ForwardToLeader;
9+
use openraft::error::Infallible;
10+
use openraft::error::InitializeError;
11+
use openraft::error::NetworkError;
12+
use openraft::error::RPCError;
13+
use openraft::error::RemoteError;
14+
use openraft::raft::AddLearnerResponse;
15+
use openraft::raft::ClientWriteResponse;
16+
use openraft::RaftMetrics;
17+
use reqwest::Client;
18+
use serde::de::DeserializeOwned;
19+
use serde::Deserialize;
20+
use serde::Serialize;
21+
22+
use crate::ExampleNodeId;
23+
use crate::ExampleRequest;
24+
use crate::ExampleTypeConfig;
25+
26+
#[derive(Debug, Clone, Serialize, Deserialize)]
27+
pub struct Empty {}
28+
29+
pub struct ExampleClient {
30+
/// The leader node to send request to.
31+
///
32+
/// All traffic should be sent to the leader in a cluster.
33+
pub leader: Arc<Mutex<(ExampleNodeId, String)>>,
34+
35+
pub inner: Client,
36+
}
37+
38+
impl ExampleClient {
39+
/// Create a client with a leader node id and a node manager to get node address by node id.
40+
pub fn new(leader_id: ExampleNodeId, leader_addr: String) -> Self {
41+
Self {
42+
leader: Arc::new(Mutex::new((leader_id, leader_addr))),
43+
inner: reqwest::Client::new(),
44+
}
45+
}
46+
47+
// --- Application API
48+
49+
/// Submit a write request to the raft cluster.
50+
///
51+
/// The request will be processed by raft protocol: it will be replicated to a quorum and then will be applied to
52+
/// state machine.
53+
///
54+
/// The result of applying the request will be returned.
55+
pub async fn write(
56+
&self,
57+
req: &ExampleRequest,
58+
) -> Result<ClientWriteResponse<ExampleTypeConfig>, RPCError<ExampleNodeId, ClientWriteError<ExampleNodeId>>> {
59+
self.send_rpc_to_leader("api/write", Some(req)).await
60+
}
61+
62+
/// Read value by key, in an inconsistent mode.
63+
///
64+
/// This method may return stale value because it does not force to read on a legal leader.
65+
pub async fn read(&self, req: &String) -> Result<String, RPCError<ExampleNodeId, Infallible>> {
66+
self.do_send_rpc_to_leader("api/read", Some(req)).await
67+
}
68+
69+
/// Consistent Read value by key, in an inconsistent mode.
70+
///
71+
/// This method MUST return consitent value or CheckIsLeaderError.
72+
pub async fn consistent_read(
73+
&self,
74+
req: &String,
75+
) -> Result<String, RPCError<ExampleNodeId, CheckIsLeaderError<ExampleNodeId>>> {
76+
self.do_send_rpc_to_leader("api/consistent_read", Some(req)).await
77+
}
78+
79+
// --- Cluster management API
80+
81+
/// Initialize a cluster of only the node that receives this request.
82+
///
83+
/// This is the first step to initialize a cluster.
84+
/// With a initialized cluster, new node can be added with [`write`].
85+
/// Then setup replication with [`add_learner`].
86+
/// Then make the new node a member with [`change_membership`].
87+
pub async fn init(&self) -> Result<(), RPCError<ExampleNodeId, InitializeError<ExampleNodeId>>> {
88+
self.do_send_rpc_to_leader("cluster/init", Some(&Empty {})).await
89+
}
90+
91+
/// Add a node as learner.
92+
///
93+
/// The node to add has to exist, i.e., being added with `write(ExampleRequest::AddNode{})`
94+
pub async fn add_learner(
95+
&self,
96+
req: (ExampleNodeId, String, String),
97+
) -> Result<AddLearnerResponse<ExampleNodeId>, RPCError<ExampleNodeId, AddLearnerError<ExampleNodeId>>> {
98+
self.send_rpc_to_leader("cluster/add-learner", Some(&req)).await
99+
}
100+
101+
/// Change membership to the specified set of nodes.
102+
///
103+
/// All nodes in `req` have to be already added as learner with [`add_learner`],
104+
/// or an error [`LearnerNotFound`] will be returned.
105+
pub async fn change_membership(
106+
&self,
107+
req: &BTreeSet<ExampleNodeId>,
108+
) -> Result<ClientWriteResponse<ExampleTypeConfig>, RPCError<ExampleNodeId, ClientWriteError<ExampleNodeId>>> {
109+
self.send_rpc_to_leader("cluster/change-membership", Some(req)).await
110+
}
111+
112+
/// Get the metrics about the cluster.
113+
///
114+
/// Metrics contains various information about the cluster, such as current leader,
115+
/// membership config, replication status etc.
116+
/// See [`RaftMetrics`].
117+
pub async fn metrics(&self) -> Result<RaftMetrics<ExampleNodeId>, RPCError<ExampleNodeId, Infallible>> {
118+
self.do_send_rpc_to_leader("cluster/metrics", None::<&()>).await
119+
}
120+
121+
// --- Internal methods
122+
123+
/// Send RPC to specified node.
124+
///
125+
/// It sends out a POST request if `req` is Some. Otherwise a GET request.
126+
/// The remote endpoint must respond a reply in form of `Result<T, E>`.
127+
/// An `Err` happened on remote will be wrapped in an [`RPCError::RemoteError`].
128+
async fn do_send_rpc_to_leader<Req, Resp, Err>(
129+
&self,
130+
uri: &str,
131+
req: Option<&Req>,
132+
) -> Result<Resp, RPCError<ExampleNodeId, Err>>
133+
where
134+
Req: Serialize + 'static,
135+
Resp: Serialize + DeserializeOwned,
136+
Err: std::error::Error + Serialize + DeserializeOwned,
137+
{
138+
let (leader_id, url) = {
139+
let t = self.leader.lock().unwrap();
140+
let target_addr = &t.1;
141+
(t.0, format!("http://{}/{}", target_addr, uri))
142+
};
143+
144+
let resp = if let Some(r) = req {
145+
println!(
146+
">>> client send request to {}: {}",
147+
url,
148+
serde_json::to_string_pretty(&r).unwrap()
149+
);
150+
self.inner.post(url.clone()).json(r)
151+
} else {
152+
println!(">>> client send request to {}", url,);
153+
self.inner.get(url.clone())
154+
}
155+
.send()
156+
.await
157+
.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
158+
159+
let res: Result<Resp, Err> = resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
160+
println!(
161+
"<<< client recv reply from {}: {}",
162+
url,
163+
serde_json::to_string_pretty(&res).unwrap()
164+
);
165+
166+
res.map_err(|e| RPCError::RemoteError(RemoteError::new(leader_id, e)))
167+
}
168+
169+
/// Try the best to send a request to the leader.
170+
///
171+
/// If the target node is not a leader, a `ForwardToLeader` error will be
172+
/// returned and this client will retry at most 3 times to contact the updated leader.
173+
async fn send_rpc_to_leader<Req, Resp, Err>(
174+
&self,
175+
uri: &str,
176+
req: Option<&Req>,
177+
) -> Result<Resp, RPCError<ExampleNodeId, Err>>
178+
where
179+
Req: Serialize + 'static,
180+
Resp: Serialize + DeserializeOwned,
181+
Err: std::error::Error + Serialize + DeserializeOwned + TryInto<ForwardToLeader<ExampleNodeId>> + Clone,
182+
{
183+
// Retry at most 3 times to find a valid leader.
184+
let mut n_retry = 3;
185+
186+
loop {
187+
let res: Result<Resp, RPCError<ExampleNodeId, Err>> = self.do_send_rpc_to_leader(uri, req).await;
188+
189+
let rpc_err = match res {
190+
Ok(x) => return Ok(x),
191+
Err(rpc_err) => rpc_err,
192+
};
193+
194+
if let RPCError::RemoteError(remote_err) = &rpc_err {
195+
let forward_err_res =
196+
<Err as TryInto<ForwardToLeader<ExampleNodeId>>>::try_into(remote_err.source.clone());
197+
198+
if let Ok(ForwardToLeader {
199+
leader_id: Some(leader_id),
200+
leader_node: Some(leader_node),
201+
..
202+
}) = forward_err_res
203+
{
204+
// Update target to the new leader.
205+
{
206+
let mut t = self.leader.lock().unwrap();
207+
let api_addr = leader_node.data.get("api_addr").unwrap().clone();
208+
*t = (leader_id, api_addr);
209+
}
210+
211+
n_retry -= 1;
212+
if n_retry > 0 {
213+
continue;
214+
}
215+
}
216+
}
217+
218+
return Err(rpc_err);
219+
}
220+
}
221+
}

0 commit comments

Comments
 (0)