From 8a78b9672b3966672e30d5e758e9ba22afc0ccbe Mon Sep 17 00:00:00 2001 From: TeslaRustor <77013810+TeslaRustor@users.noreply.github.com> Date: Wed, 13 Mar 2024 03:57:09 +0000 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#270]=E2=9C=A8Implement=20remoting=20c?= =?UTF-8?q?lient-1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-common/src/common/future.rs | 13 ++ rocketmq-remoting/src/clients.rs | 60 ++++++--- .../src/clients/rocketmq_default_impl.rs | 126 ++++++++++++++++++ rocketmq-remoting/src/runtime.rs | 10 +- rocketmq-remoting/src/runtime/config.rs | 2 +- .../src/runtime/config/client_config.rs | 8 +- .../src/server/rocketmq_server.rs | 6 +- 7 files changed, 197 insertions(+), 28 deletions(-) create mode 100644 rocketmq-remoting/src/clients/rocketmq_default_impl.rs diff --git a/rocketmq-common/src/common/future.rs b/rocketmq-common/src/common/future.rs index ac25a5be..f99f6055 100644 --- a/rocketmq-common/src/common/future.rs +++ b/rocketmq-common/src/common/future.rs @@ -40,6 +40,9 @@ struct CompletableFutureState { waker: Option, /// The data value contained within the CompletableFuture upon completion. data: Option, + + /// An optional error value contained within the CompletableFuture upon completion. + error: Option>, } /// A CompletableFuture represents a future value that may be completed or pending. @@ -64,6 +67,7 @@ impl CompletableFuture { completed: State::Pending, waker: None, data: None, + error: None, })); let arc = status.clone(); @@ -101,6 +105,15 @@ impl CompletableFuture { waker.wake(); } } + + pub fn complete_exceptionally(&mut self, error: Box) { + let mut state = self.state.lock().unwrap(); + state.completed = State::Ready; + state.error = Some(error); + if let Some(waker) = state.waker.take() { + waker.wake(); + } + } } impl std::future::Future for CompletableFuture { diff --git a/rocketmq-remoting/src/clients.rs b/rocketmq-remoting/src/clients.rs index ce30f6d8..43a998a0 100644 --- a/rocketmq-remoting/src/clients.rs +++ b/rocketmq-remoting/src/clients.rs @@ -24,7 +24,7 @@ use std::{ pub use blocking_client::BlockingClient; pub use client::Client; -use rocketmq_common::TokioExecutorService; +use rocketmq_common::{common::future::CompletableFuture, TokioExecutorService}; use crate::{ net::ResponseFuture, @@ -37,6 +37,7 @@ mod async_client; mod blocking_client; mod client; +mod rocketmq_default_impl; #[derive(Default)] pub struct RemoteClient { @@ -91,13 +92,15 @@ trait RemotingClient: RemotingService { request: RemotingCommand, timeout_millis: u64, ) -> Result>; - fn invoke_async( + + async fn invoke_async( &mut self, addr: String, request: RemotingCommand, timeout_millis: u64, - invoke_callback: Arc, + invoke_callback: impl InvokeCallback, ) -> Result<(), Box>; + fn invoke_oneway( &self, addr: String, @@ -105,16 +108,32 @@ trait RemotingClient: RemotingService { timeout_millis: u64, ) -> Result<(), Box>; - fn invoke( + async fn invoke( &mut self, addr: String, request: RemotingCommand, timeout_millis: u64, - ) -> Result> { - let future = Arc::new(DefaultInvokeCallback {}); - match self.invoke_async(addr, request, timeout_millis, future) { - Ok(_) => Ok(RemotingCommand::default()), - Err(e) => Err(e), + ) -> Result, Box> { + let completable_future = CompletableFuture::new(); + let sender = completable_future.get_sender(); + match self + .invoke_async( + addr, + request, + timeout_millis, + |response: Option, + error: Option>, + _response_future: Option| { + if let Some(response) = response { + let _ = sender.blocking_send(response); + } else if let Some(_error) = error { + } + }, + ) + .await + { + Ok(_) => Ok(completable_future), + Err(err) => Err(err), } } @@ -128,14 +147,25 @@ trait RemotingClient: RemotingService { fn set_callback_executor(&mut self, executor: Arc); fn is_address_reachable(&mut self, addr: String); -} -struct DefaultInvokeCallback; + fn close_clients(&mut self, addrs: Vec); +} -impl InvokeCallback for DefaultInvokeCallback { - fn operation_complete(&self, _response_future: ResponseFuture) {} +impl InvokeCallback for T +where + T: Fn(Option, Option>, Option) + + Send + + Sync, +{ + fn operation_complete(&self, response_future: ResponseFuture) { + self(None, None, Some(response_future)) + } - fn operation_succeed(&self, _response: RemotingCommand) {} + fn operation_succeed(&self, response: RemotingCommand) { + self(Some(response), None, None) + } - fn operation_fail(&self, _throwable: Box) {} + fn operation_fail(&self, throwable: Box) { + self(None, Some(throwable), None) + } } diff --git a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs new file mode 100644 index 00000000..b5bdbc60 --- /dev/null +++ b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::{error::Error, sync::Arc}; + +use rocketmq_common::TokioExecutorService; + +use crate::{ + clients::RemotingClient, + protocol::remoting_command::RemotingCommand, + remoting::{InvokeCallback, RemotingService}, + runtime::{ + config::client_config::TokioClientConfig, processor::RequestProcessor, RPCHook, + ServiceBridge, + }, +}; + +pub struct RocketmqDefaultClient { + service_bridge: ServiceBridge, + tokio_client_config: TokioClientConfig, +} + +impl RocketmqDefaultClient { + pub fn new(tokio_client_config: TokioClientConfig) -> Self { + Self { + service_bridge: ServiceBridge::new(), + tokio_client_config, + } + } +} + +#[allow(unused_variables)] +impl RemotingService for RocketmqDefaultClient { + async fn start(&mut self) { + todo!() + } + + fn shutdown(&mut self) { + todo!() + } + + fn register_rpc_hook(&mut self, hook: impl RPCHook) { + todo!() + } + + fn clear_rpc_hook(&mut self) { + todo!() + } +} + +#[allow(unused_variables)] +impl RemotingClient for RocketmqDefaultClient { + fn update_name_server_address_list(&mut self, addrs: Vec) { + todo!() + } + + fn get_name_server_address_list(&self) -> Vec { + todo!() + } + + fn get_available_name_srv_list(&self) -> Vec { + todo!() + } + + fn invoke_sync( + &mut self, + addr: String, + request: RemotingCommand, + timeout_millis: u64, + ) -> Result> { + todo!() + } + + async fn invoke_async( + &mut self, + addr: String, + request: RemotingCommand, + timeout_millis: u64, + invoke_callback: impl InvokeCallback, + ) -> Result<(), Box> { + todo!() + } + + fn invoke_oneway( + &self, + addr: String, + request: RemotingCommand, + timeout_millis: u64, + ) -> Result<(), Box> { + todo!() + } + + fn register_processor( + &mut self, + request_code: i32, + processor: impl RequestProcessor + Send + Sync + 'static, + executor: Arc, + ) { + todo!() + } + + fn set_callback_executor(&mut self, executor: Arc) { + todo!() + } + + fn is_address_reachable(&mut self, addr: String) { + todo!() + } + + fn close_clients(&mut self, addrs: Vec) { + todo!() + } +} diff --git a/rocketmq-remoting/src/runtime.rs b/rocketmq-remoting/src/runtime.rs index 9bacaa0f..d089965a 100644 --- a/rocketmq-remoting/src/runtime.rs +++ b/rocketmq-remoting/src/runtime.rs @@ -25,7 +25,7 @@ use crate::{ runtime::{processor::RequestProcessor, server::ConnectionHandlerContext}, }; -mod config; +pub mod config; pub mod processor; pub mod server; @@ -46,7 +46,7 @@ pub trait RPCHook: Send + Sync + 'static { ); } -pub struct ServerInner { +pub struct ServiceBridge { //Limiting the maximum number of one-way requests. pub(crate) semaphore_oneway: tokio::sync::Semaphore, //Limiting the maximum number of asynchronous requests. @@ -67,7 +67,7 @@ pub struct ServerInner { pub(crate) rpc_hooks: Vec>, } -impl ServerInner { +impl ServiceBridge { pub fn new() -> Self { Self { semaphore_oneway: tokio::sync::Semaphore::new(1000), @@ -82,13 +82,13 @@ impl ServerInner { } } -impl Default for ServerInner { +impl Default for ServiceBridge { fn default() -> Self { Self::new() } } -impl ServerInner { +impl ServiceBridge { pub fn process_message_received( &mut self, ctx: ConnectionHandlerContext, diff --git a/rocketmq-remoting/src/runtime/config.rs b/rocketmq-remoting/src/runtime/config.rs index 8e429154..5c606854 100644 --- a/rocketmq-remoting/src/runtime/config.rs +++ b/rocketmq-remoting/src/runtime/config.rs @@ -15,6 +15,6 @@ * limitations under the License. */ -mod client_config; +pub mod client_config; mod net_system_config; mod server_config; diff --git a/rocketmq-remoting/src/runtime/config/client_config.rs b/rocketmq-remoting/src/runtime/config/client_config.rs index 0cd24318..29a50191 100644 --- a/rocketmq-remoting/src/runtime/config/client_config.rs +++ b/rocketmq-remoting/src/runtime/config/client_config.rs @@ -23,7 +23,7 @@ lazy_static! { static ref NET_SYSTEM_CONFIG: NetSystemConfig = NetSystemConfig::new(); } -pub struct NettyClientConfig { +pub struct TokioClientConfig { // Worker thread number pub client_worker_threads: i32, pub client_callback_executor_threads: usize, @@ -47,9 +47,9 @@ pub struct NettyClientConfig { pub enable_transparent_retry: bool, } -impl Default for NettyClientConfig { +impl Default for TokioClientConfig { fn default() -> Self { - NettyClientConfig { + TokioClientConfig { client_worker_threads: NET_SYSTEM_CONFIG.client_worker_size, client_callback_executor_threads: num_cpus::get(), client_oneway_semaphore_value: NET_SYSTEM_CONFIG.client_oneway_semaphore_value, @@ -80,7 +80,7 @@ mod tests { #[test] fn test_default_config() { - let default_config = NettyClientConfig::default(); + let default_config = TokioClientConfig::default(); assert_eq!( default_config.client_worker_threads, diff --git a/rocketmq-remoting/src/server/rocketmq_server.rs b/rocketmq-remoting/src/server/rocketmq_server.rs index 14c3455d..028d1692 100644 --- a/rocketmq-remoting/src/server/rocketmq_server.rs +++ b/rocketmq-remoting/src/server/rocketmq_server.rs @@ -25,13 +25,13 @@ use tokio::{net::TcpListener, sync::broadcast, task::JoinHandle}; use crate::{ protocol::remoting_command::RemotingCommand, remoting::{InvokeCallback, RemotingService}, - runtime::{processor::RequestProcessor, server::run, RPCHook, ServerInner}, + runtime::{processor::RequestProcessor, server::run, RPCHook, ServiceBridge}, server::{config::BrokerServerConfig, RemotingServer}, }; pub struct RocketmqDefaultServer { pub(crate) broker_server_config: BrokerServerConfig, - pub(crate) server_inner: ServerInner, + pub(crate) server_inner: ServiceBridge, pub future: Option>, } @@ -39,7 +39,7 @@ impl RocketmqDefaultServer { pub fn new(broker_server_config: BrokerServerConfig) -> Self { Self { broker_server_config, - server_inner: ServerInner::new(), + server_inner: ServiceBridge::new(), future: None, } }