Skip to content

Commit e3fa011

Browse files
committed
[ISSUE #2378]🔥Implement RocketmqDefaultClient shutdown method🚀
1 parent 438ff1f commit e3fa011

File tree

1 file changed

+44
-21
lines changed

1 file changed

+44
-21
lines changed

rocketmq-remoting/src/clients/rocketmq_default_impl.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use rand::Rng;
2525
use rocketmq_runtime::RocketMQRuntime;
2626
use rocketmq_rust::ArcMut;
2727
use rocketmq_rust::WeakArcMut;
28+
use tokio::runtime::Handle;
2829
use tokio::sync::Mutex;
2930
use tokio::time;
3031
use tracing::debug;
@@ -56,7 +57,7 @@ pub struct RocketmqDefaultClient<PR = DefaultRemotingRequestProcessor> {
5657
namesrv_addr_choosed: ArcMut<Option<CheetahString>>,
5758
available_namesrv_addr_set: ArcMut<HashSet<CheetahString>>,
5859
namesrv_index: Arc<AtomicI32>,
59-
client_runtime: Arc<RocketMQRuntime>,
60+
client_runtime: Option<RocketMQRuntime>,
6061
processor: PR,
6162
tx: Option<tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
6263
}
@@ -77,7 +78,7 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR> {
7778
namesrv_addr_choosed: ArcMut::new(Default::default()),
7879
available_namesrv_addr_set: ArcMut::new(Default::default()),
7980
namesrv_index: Arc::new(AtomicI32::new(init_value_index())),
80-
client_runtime: Arc::new(RocketMQRuntime::new_multi(10, "client-thread")),
81+
client_runtime: Some(RocketMQRuntime::new_multi(10, "client-thread")),
8182
processor,
8283
tx,
8384
}
@@ -241,17 +242,33 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingService for Rocketmq
241242
async fn start(&self, this: WeakArcMut<Self>) {
242243
if let Some(client) = this.upgrade() {
243244
let connect_timeout_millis = self.tokio_client_config.connect_timeout_millis as u64;
244-
self.client_runtime.get_handle().spawn(async move {
245-
loop {
246-
client.scan_available_name_srv().await;
247-
time::sleep(Duration::from_millis(connect_timeout_millis)).await;
248-
}
249-
});
245+
self.client_runtime
246+
.as_ref()
247+
.unwrap()
248+
.get_handle()
249+
.spawn(async move {
250+
loop {
251+
client.scan_available_name_srv().await;
252+
time::sleep(Duration::from_millis(connect_timeout_millis)).await;
253+
}
254+
});
250255
}
251256
}
252257

253258
fn shutdown(&mut self) {
254-
todo!()
259+
if let Some(rt) = self.client_runtime.take() {
260+
rt.shutdown();
261+
}
262+
let connection_tables = self.connection_tables.clone();
263+
tokio::task::block_in_place(move || {
264+
Handle::current().block_on(async move {
265+
connection_tables.lock().await.clear();
266+
});
267+
});
268+
self.namesrv_addr_list.clear();
269+
self.available_namesrv_addr_set.clear();
270+
271+
info!(">>>>>>>>>>>>>>>RemotingClient shutdown success<<<<<<<<<<<<<<<<<");
255272
}
256273

257274
fn register_rpc_hook(&mut self, hook: Arc<Box<dyn RPCHook>>) {
@@ -337,6 +354,8 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqD
337354
Some(mut client) => {
338355
match self
339356
.client_runtime
357+
.as_ref()
358+
.unwrap()
340359
.get_handle()
341360
.spawn(async move {
342361
time::timeout(Duration::from_millis(timeout_millis), async move {
@@ -371,18 +390,22 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqD
371390
error!("get client failed");
372391
}
373392
Some(mut client) => {
374-
self.client_runtime.get_handle().spawn(async move {
375-
match time::timeout(Duration::from_millis(timeout_millis), async move {
376-
let mut request = request;
377-
request.mark_oneway_rpc_ref();
378-
client.send(request).await
379-
})
380-
.await
381-
{
382-
Ok(_) => Ok(()),
383-
Err(err) => Err(RemotingError::RemoteError(err.to_string())),
384-
}
385-
});
393+
self.client_runtime
394+
.as_ref()
395+
.unwrap()
396+
.get_handle()
397+
.spawn(async move {
398+
match time::timeout(Duration::from_millis(timeout_millis), async move {
399+
let mut request = request;
400+
request.mark_oneway_rpc_ref();
401+
client.send(request).await
402+
})
403+
.await
404+
{
405+
Ok(_) => Ok(()),
406+
Err(err) => Err(RemotingError::RemoteError(err.to_string())),
407+
}
408+
});
386409
}
387410
}
388411
}

0 commit comments

Comments
 (0)