Skip to content

Commit f866ec1

Browse files
authored
[ISSUE #288]🎨Optimize ArcProcessorTable code (#289)
* [ISSUE #288]🎨Optimize ArcProcessorTable code * remove useless file
1 parent b60cd20 commit f866ec1

File tree

2 files changed

+8
-18
lines changed

2 files changed

+8
-18
lines changed

rocketmq-namesrv/src/bin/bootstrap_server.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use rocketmq_namesrv::{
2828
};
2929
use rocketmq_remoting::{
3030
code::request_code::RequestCode,
31-
runtime::{processor::RequestProcessor, server},
31+
runtime::{processor::RequestProcessor, server, ArcProcessorTable},
3232
};
3333
use rocketmq_rust::rocketmq;
3434
use tokio::{net::TcpListener, sync::broadcast, task::JoinHandle};
@@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
6262
listener,
6363
tokio::signal::ctrl_c(),
6464
Arc::new(Box::new(default_request_processor)),
65-
Arc::new(processor_table),
65+
processor_table,
6666
Some(notify_conn_disconnect),
6767
)
6868
.await;
@@ -71,7 +71,7 @@ async fn main() -> anyhow::Result<()> {
7171
}
7272

7373
type InitProcessorsReturn = (
74-
HashMap<i32, Box<dyn RequestProcessor + Send + Sync + 'static>>,
74+
ArcProcessorTable,
7575
DefaultRequestProcessor,
7676
ScheduledExecutorService,
7777
JoinHandle<()>,
@@ -86,16 +86,16 @@ fn init_processors(
8686
let route_info_manager_inner = Arc::new(parking_lot::RwLock::new(route_info_manager));
8787
let handle = RouteInfoManager::start(route_info_manager_inner.clone(), receiver);
8888
let kvconfig_manager_inner = Arc::new(parking_lot::RwLock::new(kvconfig_manager));
89-
let mut processors: HashMap<i32, Box<dyn RequestProcessor + Send + Sync + 'static>> =
89+
let mut processors: HashMap<i32, Arc<Box<dyn RequestProcessor + Send + Sync + 'static>>> =
9090
HashMap::new();
9191

9292
processors.insert(
9393
RequestCode::GetRouteinfoByTopic.to_i32(),
94-
Box::new(ClientRequestProcessor::new(
94+
Arc::new(Box::new(ClientRequestProcessor::new(
9595
route_info_manager_inner.clone(),
9696
namesrv_config.clone(),
9797
kvconfig_manager_inner.clone(),
98-
)),
98+
))),
9999
);
100100
let scheduled_executor_service = ScheduledExecutorService::new_with_config(
101101
1,

rocketmq-remoting/src/runtime.rs

+2-12
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
use std::{collections::HashMap, sync::Arc, time::Duration};
1919

20-
use rocketmq_common::{common::Pair, TokioExecutorService};
2120
use tokio::{time, time::timeout};
2221

2322
use crate::{
@@ -34,7 +33,7 @@ pub mod server;
3433

3534
pub type ArcDefaultRequestProcessor = Arc<Box<dyn RequestProcessor + Send + Sync + 'static>>;
3635

37-
pub type ArcProcessorTable = Arc<HashMap<i32, Box<dyn RequestProcessor + Sync + Send + 'static>>>;
36+
pub type ArcProcessorTable = HashMap<i32, ArcDefaultRequestProcessor>;
3837

3938
pub trait RPCHook: Send + Sync + 'static {
4039
fn do_before_request(&self, remote_addr: &str, request: &RemotingCommand);
@@ -58,13 +57,6 @@ pub struct ServiceBridge {
5857
pub(crate) processor_table: Option<ArcProcessorTable>,
5958
pub(crate) default_request_processor_pair: Option<ArcDefaultRequestProcessor>,
6059

61-
pub(crate) processor_table1: HashMap<
62-
i32, /* request code */
63-
Pair<Box<dyn RequestProcessor + Send + Sync>, TokioExecutorService>,
64-
>,
65-
pub(crate) default_request_processor_pair1:
66-
Option<Pair<Box<dyn RequestProcessor + Send + Sync>, TokioExecutorService>>,
67-
6860
pub(crate) rpc_hooks: Vec<Box<dyn RPCHook>>,
6961
}
7062

@@ -74,10 +66,8 @@ impl ServiceBridge {
7466
semaphore_oneway: tokio::sync::Semaphore::new(1000),
7567
semaphore_async: tokio::sync::Semaphore::new(1000),
7668
response_table: HashMap::new(),
77-
processor_table: Some(Arc::new(HashMap::new())),
69+
processor_table: Some(HashMap::new()),
7870
default_request_processor_pair: None,
79-
processor_table1: Default::default(),
80-
default_request_processor_pair1: None,
8171
rpc_hooks: Vec::new(),
8272
}
8373
}

0 commit comments

Comments
 (0)