Skip to content

[ISSUE #270]✨Implement remoting client-1 #271

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions rocketmq-common/src/common/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ struct CompletableFutureState<T> {
waker: Option<Waker>,
/// The data value contained within the CompletableFuture upon completion.
data: Option<T>,

/// An optional error value contained within the CompletableFuture upon completion.
error: Option<Box<dyn std::error::Error + Send + Sync>>,
}

/// A CompletableFuture represents a future value that may be completed or pending.
Expand All @@ -64,6 +67,7 @@ impl<T: Send + 'static> CompletableFuture<T> {
completed: State::Pending,
waker: None,
data: None,
error: None,
}));
let arc = status.clone();

Expand Down Expand Up @@ -101,6 +105,15 @@ impl<T: Send + 'static> CompletableFuture<T> {
waker.wake();
}
}

pub fn complete_exceptionally(&mut self, error: Box<dyn std::error::Error + Send + Sync>) {
let mut state = self.state.lock().unwrap();
state.completed = State::Ready;
state.error = Some(error);
if let Some(waker) = state.waker.take() {
waker.wake();
}
}
}

impl<T> std::future::Future for CompletableFuture<T> {
Expand Down
60 changes: 45 additions & 15 deletions rocketmq-remoting/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{

pub use blocking_client::BlockingClient;
pub use client::Client;
use rocketmq_common::TokioExecutorService;
use rocketmq_common::{common::future::CompletableFuture, TokioExecutorService};

use crate::{
net::ResponseFuture,
Expand All @@ -37,6 +37,7 @@ mod async_client;
mod blocking_client;

mod client;
mod rocketmq_default_impl;

#[derive(Default)]
pub struct RemoteClient {
Expand Down Expand Up @@ -91,30 +92,48 @@ trait RemotingClient: RemotingService {
request: RemotingCommand,
timeout_millis: u64,
) -> Result<RemotingCommand, Box<dyn std::error::Error>>;
fn invoke_async(

async fn invoke_async(
&mut self,
addr: String,
request: RemotingCommand,
timeout_millis: u64,
invoke_callback: Arc<dyn InvokeCallback>,
invoke_callback: impl InvokeCallback,
) -> Result<(), Box<dyn std::error::Error>>;

fn invoke_oneway(
&self,
addr: String,
request: RemotingCommand,
timeout_millis: u64,
) -> Result<(), Box<dyn std::error::Error>>;

fn invoke(
async fn invoke(
&mut self,
addr: String,
request: RemotingCommand,
timeout_millis: u64,
) -> Result<RemotingCommand, Box<dyn std::error::Error>> {
let future = Arc::new(DefaultInvokeCallback {});
match self.invoke_async(addr, request, timeout_millis, future) {
Ok(_) => Ok(RemotingCommand::default()),
Err(e) => Err(e),
) -> Result<CompletableFuture<RemotingCommand>, Box<dyn std::error::Error>> {
let completable_future = CompletableFuture::new();
let sender = completable_future.get_sender();
match self
.invoke_async(
addr,
request,
timeout_millis,
|response: Option<RemotingCommand>,
error: Option<Box<dyn std::error::Error>>,
_response_future: Option<ResponseFuture>| {
if let Some(response) = response {
let _ = sender.blocking_send(response);
} else if let Some(_error) = error {
}
},
)
.await
{
Ok(_) => Ok(completable_future),
Err(err) => Err(err),
}
}

Expand All @@ -128,14 +147,25 @@ trait RemotingClient: RemotingService {
fn set_callback_executor(&mut self, executor: Arc<TokioExecutorService>);

fn is_address_reachable(&mut self, addr: String);
}

struct DefaultInvokeCallback;
fn close_clients(&mut self, addrs: Vec<String>);
}

impl InvokeCallback for DefaultInvokeCallback {
fn operation_complete(&self, _response_future: ResponseFuture) {}
impl<T> InvokeCallback for T
where
T: Fn(Option<RemotingCommand>, Option<Box<dyn std::error::Error>>, Option<ResponseFuture>)
+ Send
+ Sync,
{
fn operation_complete(&self, response_future: ResponseFuture) {
self(None, None, Some(response_future))
}

fn operation_succeed(&self, _response: RemotingCommand) {}
fn operation_succeed(&self, response: RemotingCommand) {
self(Some(response), None, None)
}

fn operation_fail(&self, _throwable: Box<dyn std::error::Error>) {}
fn operation_fail(&self, throwable: Box<dyn std::error::Error>) {
self(None, Some(throwable), None)
}
}
126 changes: 126 additions & 0 deletions rocketmq-remoting/src/clients/rocketmq_default_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::{error::Error, sync::Arc};

use rocketmq_common::TokioExecutorService;

use crate::{
clients::RemotingClient,
protocol::remoting_command::RemotingCommand,
remoting::{InvokeCallback, RemotingService},
runtime::{
config::client_config::TokioClientConfig, processor::RequestProcessor, RPCHook,
ServiceBridge,
},
};

pub struct RocketmqDefaultClient {
service_bridge: ServiceBridge,
tokio_client_config: TokioClientConfig,
}

impl RocketmqDefaultClient {
pub fn new(tokio_client_config: TokioClientConfig) -> Self {
Self {
service_bridge: ServiceBridge::new(),
tokio_client_config,
}
}
}

#[allow(unused_variables)]
impl RemotingService for RocketmqDefaultClient {
async fn start(&mut self) {
todo!()
}

fn shutdown(&mut self) {
todo!()
}

fn register_rpc_hook(&mut self, hook: impl RPCHook) {
todo!()
}

fn clear_rpc_hook(&mut self) {
todo!()
}
}

#[allow(unused_variables)]
impl RemotingClient for RocketmqDefaultClient {
fn update_name_server_address_list(&mut self, addrs: Vec<String>) {
todo!()
}

fn get_name_server_address_list(&self) -> Vec<String> {
todo!()
}

fn get_available_name_srv_list(&self) -> Vec<String> {
todo!()
}

fn invoke_sync(
&mut self,
addr: String,
request: RemotingCommand,
timeout_millis: u64,
) -> Result<RemotingCommand, Box<dyn Error>> {
todo!()
}

async fn invoke_async(
&mut self,
addr: String,
request: RemotingCommand,
timeout_millis: u64,
invoke_callback: impl InvokeCallback,
) -> Result<(), Box<dyn Error>> {
todo!()
}

fn invoke_oneway(
&self,
addr: String,
request: RemotingCommand,
timeout_millis: u64,
) -> Result<(), Box<dyn Error>> {
todo!()
}

fn register_processor(
&mut self,
request_code: i32,
processor: impl RequestProcessor + Send + Sync + 'static,
executor: Arc<TokioExecutorService>,
) {
todo!()
}

fn set_callback_executor(&mut self, executor: Arc<TokioExecutorService>) {
todo!()
}

fn is_address_reachable(&mut self, addr: String) {
todo!()
}

fn close_clients(&mut self, addrs: Vec<String>) {
todo!()
}
}
10 changes: 5 additions & 5 deletions rocketmq-remoting/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};

mod config;
pub mod config;
pub mod processor;
pub mod server;

Expand All @@ -46,7 +46,7 @@ pub trait RPCHook: Send + Sync + 'static {
);
}

pub struct ServerInner {
pub struct ServiceBridge {
//Limiting the maximum number of one-way requests.
pub(crate) semaphore_oneway: tokio::sync::Semaphore,
//Limiting the maximum number of asynchronous requests.
Expand All @@ -67,7 +67,7 @@ pub struct ServerInner {
pub(crate) rpc_hooks: Vec<Box<dyn RPCHook>>,
}

impl ServerInner {
impl ServiceBridge {
pub fn new() -> Self {
Self {
semaphore_oneway: tokio::sync::Semaphore::new(1000),
Expand All @@ -82,13 +82,13 @@ impl ServerInner {
}
}

impl Default for ServerInner {
impl Default for ServiceBridge {
fn default() -> Self {
Self::new()
}
}

impl ServerInner {
impl ServiceBridge {
pub fn process_message_received(
&mut self,
ctx: ConnectionHandlerContext,
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-remoting/src/runtime/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
* limitations under the License.
*/

mod client_config;
pub mod client_config;
mod net_system_config;
mod server_config;
8 changes: 4 additions & 4 deletions rocketmq-remoting/src/runtime/config/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ lazy_static! {
static ref NET_SYSTEM_CONFIG: NetSystemConfig = NetSystemConfig::new();
}

pub struct NettyClientConfig {
pub struct TokioClientConfig {
// Worker thread number
pub client_worker_threads: i32,
pub client_callback_executor_threads: usize,
Expand All @@ -47,9 +47,9 @@ pub struct NettyClientConfig {
pub enable_transparent_retry: bool,
}

impl Default for NettyClientConfig {
impl Default for TokioClientConfig {
fn default() -> Self {
NettyClientConfig {
TokioClientConfig {
client_worker_threads: NET_SYSTEM_CONFIG.client_worker_size,
client_callback_executor_threads: num_cpus::get(),
client_oneway_semaphore_value: NET_SYSTEM_CONFIG.client_oneway_semaphore_value,
Expand Down Expand Up @@ -80,7 +80,7 @@ mod tests {

#[test]
fn test_default_config() {
let default_config = NettyClientConfig::default();
let default_config = TokioClientConfig::default();

assert_eq!(
default_config.client_worker_threads,
Expand Down
6 changes: 3 additions & 3 deletions rocketmq-remoting/src/server/rocketmq_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ use tokio::{net::TcpListener, sync::broadcast, task::JoinHandle};
use crate::{
protocol::remoting_command::RemotingCommand,
remoting::{InvokeCallback, RemotingService},
runtime::{processor::RequestProcessor, server::run, RPCHook, ServerInner},
runtime::{processor::RequestProcessor, server::run, RPCHook, ServiceBridge},
server::{config::BrokerServerConfig, RemotingServer},
};

pub struct RocketmqDefaultServer {
pub(crate) broker_server_config: BrokerServerConfig,
pub(crate) server_inner: ServerInner,
pub(crate) server_inner: ServiceBridge,
pub future: Option<JoinHandle<()>>,
}

impl RocketmqDefaultServer {
pub fn new(broker_server_config: BrokerServerConfig) -> Self {
Self {
broker_server_config,
server_inner: ServerInner::new(),
server_inner: ServiceBridge::new(),
future: None,
}
}
Expand Down