Skip to content

Commit a9f2aa4

Browse files
authored
[ISSUE #270]✨Implement remoting client-1 (#271)
1 parent 4a68474 commit a9f2aa4

File tree

7 files changed

+197
-28
lines changed

7 files changed

+197
-28
lines changed

rocketmq-common/src/common/future.rs

+13
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ struct CompletableFutureState<T> {
4040
waker: Option<Waker>,
4141
/// The data value contained within the CompletableFuture upon completion.
4242
data: Option<T>,
43+
44+
/// An optional error value contained within the CompletableFuture upon completion.
45+
error: Option<Box<dyn std::error::Error + Send + Sync>>,
4346
}
4447

4548
/// A CompletableFuture represents a future value that may be completed or pending.
@@ -64,6 +67,7 @@ impl<T: Send + 'static> CompletableFuture<T> {
6467
completed: State::Pending,
6568
waker: None,
6669
data: None,
70+
error: None,
6771
}));
6872
let arc = status.clone();
6973

@@ -101,6 +105,15 @@ impl<T: Send + 'static> CompletableFuture<T> {
101105
waker.wake();
102106
}
103107
}
108+
109+
pub fn complete_exceptionally(&mut self, error: Box<dyn std::error::Error + Send + Sync>) {
110+
let mut state = self.state.lock().unwrap();
111+
state.completed = State::Ready;
112+
state.error = Some(error);
113+
if let Some(waker) = state.waker.take() {
114+
waker.wake();
115+
}
116+
}
104117
}
105118

106119
impl<T> std::future::Future for CompletableFuture<T> {

rocketmq-remoting/src/clients.rs

+45-15
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::{
2424

2525
pub use blocking_client::BlockingClient;
2626
pub use client::Client;
27-
use rocketmq_common::TokioExecutorService;
27+
use rocketmq_common::{common::future::CompletableFuture, TokioExecutorService};
2828

2929
use crate::{
3030
net::ResponseFuture,
@@ -37,6 +37,7 @@ mod async_client;
3737
mod blocking_client;
3838

3939
mod client;
40+
mod rocketmq_default_impl;
4041

4142
#[derive(Default)]
4243
pub struct RemoteClient {
@@ -91,30 +92,48 @@ trait RemotingClient: RemotingService {
9192
request: RemotingCommand,
9293
timeout_millis: u64,
9394
) -> Result<RemotingCommand, Box<dyn std::error::Error>>;
94-
fn invoke_async(
95+
96+
async fn invoke_async(
9597
&mut self,
9698
addr: String,
9799
request: RemotingCommand,
98100
timeout_millis: u64,
99-
invoke_callback: Arc<dyn InvokeCallback>,
101+
invoke_callback: impl InvokeCallback,
100102
) -> Result<(), Box<dyn std::error::Error>>;
103+
101104
fn invoke_oneway(
102105
&self,
103106
addr: String,
104107
request: RemotingCommand,
105108
timeout_millis: u64,
106109
) -> Result<(), Box<dyn std::error::Error>>;
107110

108-
fn invoke(
111+
async fn invoke(
109112
&mut self,
110113
addr: String,
111114
request: RemotingCommand,
112115
timeout_millis: u64,
113-
) -> Result<RemotingCommand, Box<dyn std::error::Error>> {
114-
let future = Arc::new(DefaultInvokeCallback {});
115-
match self.invoke_async(addr, request, timeout_millis, future) {
116-
Ok(_) => Ok(RemotingCommand::default()),
117-
Err(e) => Err(e),
116+
) -> Result<CompletableFuture<RemotingCommand>, Box<dyn std::error::Error>> {
117+
let completable_future = CompletableFuture::new();
118+
let sender = completable_future.get_sender();
119+
match self
120+
.invoke_async(
121+
addr,
122+
request,
123+
timeout_millis,
124+
|response: Option<RemotingCommand>,
125+
error: Option<Box<dyn std::error::Error>>,
126+
_response_future: Option<ResponseFuture>| {
127+
if let Some(response) = response {
128+
let _ = sender.blocking_send(response);
129+
} else if let Some(_error) = error {
130+
}
131+
},
132+
)
133+
.await
134+
{
135+
Ok(_) => Ok(completable_future),
136+
Err(err) => Err(err),
118137
}
119138
}
120139

@@ -128,14 +147,25 @@ trait RemotingClient: RemotingService {
128147
fn set_callback_executor(&mut self, executor: Arc<TokioExecutorService>);
129148

130149
fn is_address_reachable(&mut self, addr: String);
131-
}
132150

133-
struct DefaultInvokeCallback;
151+
fn close_clients(&mut self, addrs: Vec<String>);
152+
}
134153

135-
impl InvokeCallback for DefaultInvokeCallback {
136-
fn operation_complete(&self, _response_future: ResponseFuture) {}
154+
impl<T> InvokeCallback for T
155+
where
156+
T: Fn(Option<RemotingCommand>, Option<Box<dyn std::error::Error>>, Option<ResponseFuture>)
157+
+ Send
158+
+ Sync,
159+
{
160+
fn operation_complete(&self, response_future: ResponseFuture) {
161+
self(None, None, Some(response_future))
162+
}
137163

138-
fn operation_succeed(&self, _response: RemotingCommand) {}
164+
fn operation_succeed(&self, response: RemotingCommand) {
165+
self(Some(response), None, None)
166+
}
139167

140-
fn operation_fail(&self, _throwable: Box<dyn std::error::Error>) {}
168+
fn operation_fail(&self, throwable: Box<dyn std::error::Error>) {
169+
self(None, Some(throwable), None)
170+
}
141171
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::{error::Error, sync::Arc};
18+
19+
use rocketmq_common::TokioExecutorService;
20+
21+
use crate::{
22+
clients::RemotingClient,
23+
protocol::remoting_command::RemotingCommand,
24+
remoting::{InvokeCallback, RemotingService},
25+
runtime::{
26+
config::client_config::TokioClientConfig, processor::RequestProcessor, RPCHook,
27+
ServiceBridge,
28+
},
29+
};
30+
31+
pub struct RocketmqDefaultClient {
32+
service_bridge: ServiceBridge,
33+
tokio_client_config: TokioClientConfig,
34+
}
35+
36+
impl RocketmqDefaultClient {
37+
pub fn new(tokio_client_config: TokioClientConfig) -> Self {
38+
Self {
39+
service_bridge: ServiceBridge::new(),
40+
tokio_client_config,
41+
}
42+
}
43+
}
44+
45+
#[allow(unused_variables)]
46+
impl RemotingService for RocketmqDefaultClient {
47+
async fn start(&mut self) {
48+
todo!()
49+
}
50+
51+
fn shutdown(&mut self) {
52+
todo!()
53+
}
54+
55+
fn register_rpc_hook(&mut self, hook: impl RPCHook) {
56+
todo!()
57+
}
58+
59+
fn clear_rpc_hook(&mut self) {
60+
todo!()
61+
}
62+
}
63+
64+
#[allow(unused_variables)]
65+
impl RemotingClient for RocketmqDefaultClient {
66+
fn update_name_server_address_list(&mut self, addrs: Vec<String>) {
67+
todo!()
68+
}
69+
70+
fn get_name_server_address_list(&self) -> Vec<String> {
71+
todo!()
72+
}
73+
74+
fn get_available_name_srv_list(&self) -> Vec<String> {
75+
todo!()
76+
}
77+
78+
fn invoke_sync(
79+
&mut self,
80+
addr: String,
81+
request: RemotingCommand,
82+
timeout_millis: u64,
83+
) -> Result<RemotingCommand, Box<dyn Error>> {
84+
todo!()
85+
}
86+
87+
async fn invoke_async(
88+
&mut self,
89+
addr: String,
90+
request: RemotingCommand,
91+
timeout_millis: u64,
92+
invoke_callback: impl InvokeCallback,
93+
) -> Result<(), Box<dyn Error>> {
94+
todo!()
95+
}
96+
97+
fn invoke_oneway(
98+
&self,
99+
addr: String,
100+
request: RemotingCommand,
101+
timeout_millis: u64,
102+
) -> Result<(), Box<dyn Error>> {
103+
todo!()
104+
}
105+
106+
fn register_processor(
107+
&mut self,
108+
request_code: i32,
109+
processor: impl RequestProcessor + Send + Sync + 'static,
110+
executor: Arc<TokioExecutorService>,
111+
) {
112+
todo!()
113+
}
114+
115+
fn set_callback_executor(&mut self, executor: Arc<TokioExecutorService>) {
116+
todo!()
117+
}
118+
119+
fn is_address_reachable(&mut self, addr: String) {
120+
todo!()
121+
}
122+
123+
fn close_clients(&mut self, addrs: Vec<String>) {
124+
todo!()
125+
}
126+
}

rocketmq-remoting/src/runtime.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
2626
};
2727

28-
mod config;
28+
pub mod config;
2929
pub mod processor;
3030
pub mod server;
3131

@@ -46,7 +46,7 @@ pub trait RPCHook: Send + Sync + 'static {
4646
);
4747
}
4848

49-
pub struct ServerInner {
49+
pub struct ServiceBridge {
5050
//Limiting the maximum number of one-way requests.
5151
pub(crate) semaphore_oneway: tokio::sync::Semaphore,
5252
//Limiting the maximum number of asynchronous requests.
@@ -67,7 +67,7 @@ pub struct ServerInner {
6767
pub(crate) rpc_hooks: Vec<Box<dyn RPCHook>>,
6868
}
6969

70-
impl ServerInner {
70+
impl ServiceBridge {
7171
pub fn new() -> Self {
7272
Self {
7373
semaphore_oneway: tokio::sync::Semaphore::new(1000),
@@ -82,13 +82,13 @@ impl ServerInner {
8282
}
8383
}
8484

85-
impl Default for ServerInner {
85+
impl Default for ServiceBridge {
8686
fn default() -> Self {
8787
Self::new()
8888
}
8989
}
9090

91-
impl ServerInner {
91+
impl ServiceBridge {
9292
pub fn process_message_received(
9393
&mut self,
9494
ctx: ConnectionHandlerContext,

rocketmq-remoting/src/runtime/config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
* limitations under the License.
1616
*/
1717

18-
mod client_config;
18+
pub mod client_config;
1919
mod net_system_config;
2020
mod server_config;

rocketmq-remoting/src/runtime/config/client_config.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ lazy_static! {
2323
static ref NET_SYSTEM_CONFIG: NetSystemConfig = NetSystemConfig::new();
2424
}
2525

26-
pub struct NettyClientConfig {
26+
pub struct TokioClientConfig {
2727
// Worker thread number
2828
pub client_worker_threads: i32,
2929
pub client_callback_executor_threads: usize,
@@ -47,9 +47,9 @@ pub struct NettyClientConfig {
4747
pub enable_transparent_retry: bool,
4848
}
4949

50-
impl Default for NettyClientConfig {
50+
impl Default for TokioClientConfig {
5151
fn default() -> Self {
52-
NettyClientConfig {
52+
TokioClientConfig {
5353
client_worker_threads: NET_SYSTEM_CONFIG.client_worker_size,
5454
client_callback_executor_threads: num_cpus::get(),
5555
client_oneway_semaphore_value: NET_SYSTEM_CONFIG.client_oneway_semaphore_value,
@@ -80,7 +80,7 @@ mod tests {
8080

8181
#[test]
8282
fn test_default_config() {
83-
let default_config = NettyClientConfig::default();
83+
let default_config = TokioClientConfig::default();
8484

8585
assert_eq!(
8686
default_config.client_worker_threads,

rocketmq-remoting/src/server/rocketmq_server.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,21 @@ use tokio::{net::TcpListener, sync::broadcast, task::JoinHandle};
2525
use crate::{
2626
protocol::remoting_command::RemotingCommand,
2727
remoting::{InvokeCallback, RemotingService},
28-
runtime::{processor::RequestProcessor, server::run, RPCHook, ServerInner},
28+
runtime::{processor::RequestProcessor, server::run, RPCHook, ServiceBridge},
2929
server::{config::BrokerServerConfig, RemotingServer},
3030
};
3131

3232
pub struct RocketmqDefaultServer {
3333
pub(crate) broker_server_config: BrokerServerConfig,
34-
pub(crate) server_inner: ServerInner,
34+
pub(crate) server_inner: ServiceBridge,
3535
pub future: Option<JoinHandle<()>>,
3636
}
3737

3838
impl RocketmqDefaultServer {
3939
pub fn new(broker_server_config: BrokerServerConfig) -> Self {
4040
Self {
4141
broker_server_config,
42-
server_inner: ServerInner::new(),
42+
server_inner: ServiceBridge::new(),
4343
future: None,
4444
}
4545
}

0 commit comments

Comments
 (0)