Skip to content

Commit e8bf1ae

Browse files
committed
[ISSUE #274]🚀Implement remoting client-3
1 parent 0b5ca38 commit e8bf1ae

File tree

2 files changed

+36
-11
lines changed

2 files changed

+36
-11
lines changed

rocketmq-remoting/src/clients/rocketmq_default_impl.rs

+17-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use std::{collections::HashMap, error::Error, sync::Arc};
1818

1919
use rocketmq_common::TokioExecutorService;
20+
use tokio::sync::Mutex;
2021

2122
use crate::{
2223
clients::{Client, RemotingClient},
@@ -32,7 +33,7 @@ pub struct RocketmqDefaultClient {
3233
service_bridge: ServiceBridge,
3334
tokio_client_config: TokioClientConfig,
3435
//cache connection
35-
connection_tables: HashMap<String /* ip:port */, Client>,
36+
connection_tables: HashMap<String /* ip:port */, Arc<Mutex<Client>>>,
3637
lock: std::sync::RwLock<()>,
3738
}
3839

@@ -48,19 +49,21 @@ impl RocketmqDefaultClient {
4849
}
4950

5051
impl RocketmqDefaultClient {
51-
fn get_and_create_client(&mut self, addr: String) -> &mut Client {
52+
fn get_and_create_client(&mut self, addr: String) -> Arc<Mutex<Client>> {
5253
let lc = self.lock.write().unwrap();
5354

5455
if self.connection_tables.contains_key(&addr) {
55-
return self.connection_tables.get_mut(&addr).unwrap();
56+
return self.connection_tables.get(&addr).cloned().unwrap();
5657
}
5758

5859
let addr_inner = addr.clone();
5960
let client =
6061
futures::executor::block_on(async move { Client::connect(addr_inner).await.unwrap() });
61-
self.connection_tables.insert(addr.clone(), client);
62+
63+
self.connection_tables
64+
.insert(addr.clone(), Arc::new(Mutex::new(client)));
6265
drop(lc);
63-
self.connection_tables.get_mut(&addr).unwrap()
66+
self.connection_tables.get(&addr).cloned().unwrap()
6467
}
6568
}
6669

@@ -103,7 +106,11 @@ impl RemotingClient for RocketmqDefaultClient {
103106
request: RemotingCommand,
104107
timeout_millis: u64,
105108
) -> Result<RemotingCommand, Box<dyn Error>> {
106-
todo!()
109+
let client = self.get_and_create_client(addr.clone());
110+
Ok(self
111+
.service_bridge
112+
.invoke_sync(client, request, timeout_millis)
113+
.unwrap())
107114
}
108115

109116
async fn invoke_async(
@@ -114,8 +121,10 @@ impl RemotingClient for RocketmqDefaultClient {
114121
invoke_callback: impl InvokeCallback,
115122
) -> Result<(), Box<dyn Error>> {
116123
let client = self.get_and_create_client(addr.clone());
117-
118-
unreachable!()
124+
self.service_bridge
125+
.invoke_async(client, request, timeout_millis, invoke_callback)
126+
.await;
127+
Ok(())
119128
}
120129

121130
fn invoke_oneway(

rocketmq-remoting/src/runtime.rs

+19-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::{collections::HashMap, sync::Arc, time::Duration};
1919

2020
use rocketmq_common::{common::Pair, TokioExecutorService};
21-
use tokio::time;
21+
use tokio::{runtime::Runtime, sync::Mutex, time, time::timeout};
2222

2323
use crate::{
2424
clients::Client,
@@ -119,17 +119,33 @@ impl ServiceBridge {
119119

120120
pub async fn invoke_async(
121121
&mut self,
122-
client: &mut Client,
122+
client: Arc<Mutex<Client>>,
123123
request: RemotingCommand,
124124
timeout_millis: u64,
125125
invoke_callback: impl InvokeCallback,
126126
) {
127127
if let Ok(resp) = time::timeout(Duration::from_millis(timeout_millis), async {
128-
client.invoke(request).await.unwrap()
128+
client.lock().await.invoke(request).await.unwrap()
129129
})
130130
.await
131131
{
132132
invoke_callback.operation_succeed(resp)
133133
}
134134
}
135+
136+
pub fn invoke_sync(
137+
&mut self,
138+
client: Arc<Mutex<Client>>,
139+
request: RemotingCommand,
140+
timeout_millis: u64,
141+
) -> Option<RemotingCommand> {
142+
let remoting_command = Runtime::new().unwrap().block_on(async move {
143+
let result = timeout(Duration::from_millis(timeout_millis), async move {
144+
client.lock().await.invoke(request).await.unwrap()
145+
})
146+
.await;
147+
result.unwrap()
148+
});
149+
Some(remoting_command)
150+
}
135151
}

0 commit comments

Comments
 (0)