From e8bf1ae9bb436cb8426563ccbc9fb4ebb1d07538 Mon Sep 17 00:00:00 2001 From: mxsm Date: Thu, 14 Mar 2024 23:27:00 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#274]=F0=9F=9A=80Implement=20remoting?= =?UTF-8?q?=20client-3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/clients/rocketmq_default_impl.rs | 25 +++++++++++++------ rocketmq-remoting/src/runtime.rs | 22 +++++++++++++--- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs index 570cb4a42..2bfc50195 100644 --- a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs +++ b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs @@ -17,6 +17,7 @@ use std::{collections::HashMap, error::Error, sync::Arc}; use rocketmq_common::TokioExecutorService; +use tokio::sync::Mutex; use crate::{ clients::{Client, RemotingClient}, @@ -32,7 +33,7 @@ pub struct RocketmqDefaultClient { service_bridge: ServiceBridge, tokio_client_config: TokioClientConfig, //cache connection - connection_tables: HashMap, + connection_tables: HashMap>>, lock: std::sync::RwLock<()>, } @@ -48,19 +49,21 @@ impl RocketmqDefaultClient { } impl RocketmqDefaultClient { - fn get_and_create_client(&mut self, addr: String) -> &mut Client { + fn get_and_create_client(&mut self, addr: String) -> Arc> { let lc = self.lock.write().unwrap(); if self.connection_tables.contains_key(&addr) { - return self.connection_tables.get_mut(&addr).unwrap(); + return self.connection_tables.get(&addr).cloned().unwrap(); } let addr_inner = addr.clone(); let client = futures::executor::block_on(async move { Client::connect(addr_inner).await.unwrap() }); - self.connection_tables.insert(addr.clone(), client); + + self.connection_tables + .insert(addr.clone(), Arc::new(Mutex::new(client))); drop(lc); - self.connection_tables.get_mut(&addr).unwrap() + self.connection_tables.get(&addr).cloned().unwrap() } } @@ -103,7 +106,11 @@ impl RemotingClient for RocketmqDefaultClient { request: RemotingCommand, timeout_millis: u64, ) -> Result> { - todo!() + let client = self.get_and_create_client(addr.clone()); + Ok(self + .service_bridge + .invoke_sync(client, request, timeout_millis) + .unwrap()) } async fn invoke_async( @@ -114,8 +121,10 @@ impl RemotingClient for RocketmqDefaultClient { invoke_callback: impl InvokeCallback, ) -> Result<(), Box> { let client = self.get_and_create_client(addr.clone()); - - unreachable!() + self.service_bridge + .invoke_async(client, request, timeout_millis, invoke_callback) + .await; + Ok(()) } fn invoke_oneway( diff --git a/rocketmq-remoting/src/runtime.rs b/rocketmq-remoting/src/runtime.rs index 86ec5e685..790541785 100644 --- a/rocketmq-remoting/src/runtime.rs +++ b/rocketmq-remoting/src/runtime.rs @@ -18,7 +18,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use rocketmq_common::{common::Pair, TokioExecutorService}; -use tokio::time; +use tokio::{runtime::Runtime, sync::Mutex, time, time::timeout}; use crate::{ clients::Client, @@ -119,17 +119,33 @@ impl ServiceBridge { pub async fn invoke_async( &mut self, - client: &mut Client, + client: Arc>, request: RemotingCommand, timeout_millis: u64, invoke_callback: impl InvokeCallback, ) { if let Ok(resp) = time::timeout(Duration::from_millis(timeout_millis), async { - client.invoke(request).await.unwrap() + client.lock().await.invoke(request).await.unwrap() }) .await { invoke_callback.operation_succeed(resp) } } + + pub fn invoke_sync( + &mut self, + client: Arc>, + request: RemotingCommand, + timeout_millis: u64, + ) -> Option { + let remoting_command = Runtime::new().unwrap().block_on(async move { + let result = timeout(Duration::from_millis(timeout_millis), async move { + client.lock().await.invoke(request).await.unwrap() + }) + .await; + result.unwrap() + }); + Some(remoting_command) + } }