Skip to content

[ISSUE #290]🎨Optimize RocketmqDefaultClient code #291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
pub struct BrokerOuterAPI {
remoting_client: RocketmqDefaultClient,
name_server_address: Option<String>,
broker_outer_executor: TokioExecutorService,
broker_outer_executor: Option<TokioExecutorService>,
}

impl BrokerOuterAPI {
Expand Down Expand Up @@ -66,13 +66,14 @@
}

impl BrokerOuterAPI {
pub fn update_name_server_address_list(&mut self, addrs: String) {
pub async fn update_name_server_address_list(&mut self, addrs: String) {

Check warning on line 69 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L69

Added line #L69 was not covered by tests
let addr_vec = addrs
.split("';'")
.map(|s| s.to_string())
.collect::<Vec<String>>();
self.remoting_client
.update_name_server_address_list(addr_vec)
.await
}

pub async fn register_broker_all(
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-remoting/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl RemoteClient {

#[allow(async_fn_in_trait)]
pub trait RemotingClient: RemotingService {
fn update_name_server_address_list(&mut self, addrs: Vec<String>);
async fn update_name_server_address_list(&mut self, addrs: Vec<String>);

fn get_name_server_address_list(&self) -> Vec<String>;

Expand Down
71 changes: 22 additions & 49 deletions rocketmq-remoting/src/clients/rocketmq_default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@
service_bridge: ServiceBridge,
tokio_client_config: TokioClientConfig,
//cache connection
connection_tables: HashMap<String /* ip:port */, Client>,
connection_tables_lock: std::sync::RwLock<()>,
lock: tokio::sync::RwLock<()>,
namesrv_addr_list: Arc<std::sync::Mutex<Vec<String>>>,
namesrv_addr_choosed: Arc<std::sync::Mutex<Option<String>>>,
connection_tables:
tokio::sync::Mutex<HashMap<String /* ip:port */, Arc<tokio::sync::Mutex<Client>>>>,
namesrv_addr_list: Arc<tokio::sync::Mutex<Vec<String>>>,
namesrv_addr_choosed: Arc<tokio::sync::Mutex<Option<String>>>,
}

impl RocketmqDefaultClient {
Expand All @@ -46,42 +45,23 @@
service_bridge: ServiceBridge::new(),
tokio_client_config,
connection_tables: Default::default(),
connection_tables_lock: Default::default(),
lock: Default::default(),
namesrv_addr_list: Arc::new(Default::default()),
namesrv_addr_choosed: Arc::new(Default::default()),
}
}
}

impl RocketmqDefaultClient {
async fn get_and_create_client(&mut self, addr: String) -> &Client {
let lc = self.lock.write().await;
if self.connection_tables.contains_key(&addr) {
return self.connection_tables.get(&addr).unwrap();
async fn get_and_create_client(&mut self, addr: String) -> Arc<tokio::sync::Mutex<Client>> {
let mut mutex_guard = self.connection_tables.lock().await;
if mutex_guard.contains_key(&addr) {
return mutex_guard.get(&addr).unwrap().clone();

Check warning on line 58 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L55-L58

Added lines #L55 - L58 were not covered by tests
}

let addr_inner = addr.clone();
let client = Client::connect(addr_inner).await.unwrap();

self.connection_tables.insert(addr.clone(), client);
drop(lc);
self.connection_tables.get(&addr).unwrap()
}

async fn get_and_create_client_mut(&mut self, addr: String) -> Option<&mut Client> {
let lc = self.lock.write().await;

if self.connection_tables.contains_key(&addr) {
return self.connection_tables.get_mut(&addr);
}

let addr_inner = addr.clone();
let client = Client::connect(addr_inner).await.unwrap();

self.connection_tables.insert(addr.clone(), client);
drop(lc);
self.connection_tables.get_mut(&addr)
mutex_guard.insert(addr.clone(), Arc::new(tokio::sync::Mutex::new(client)));
mutex_guard.get(&addr).unwrap().clone()

Check warning on line 64 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L63-L64

Added lines #L63 - L64 were not covered by tests
}
}

Expand All @@ -106,8 +86,8 @@

#[allow(unused_variables)]
impl RemotingClient for RocketmqDefaultClient {
fn update_name_server_address_list(&mut self, addrs: Vec<String>) {
let mut old = self.namesrv_addr_list.lock().unwrap();
async fn update_name_server_address_list(&mut self, addrs: Vec<String>) {

Check warning on line 89 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L89

Added line #L89 was not covered by tests
let mut old = self.namesrv_addr_list.lock().await;
let mut update = false;

if !addrs.is_empty() {
Expand All @@ -134,19 +114,18 @@
old.clone_from(&addrs);

// should close the channel if choosed addr is not exist.
if let Some(namesrv_addr) = self.namesrv_addr_choosed.lock().unwrap().as_ref() {
if let Some(namesrv_addr) = self.namesrv_addr_choosed.lock().await.as_ref() {
if !addrs.contains(namesrv_addr) {
let write_guard = self.connection_tables_lock.write().unwrap();
let mut remove_vec = Vec::new();
for (addr, client) in self.connection_tables.iter() {
let mut result = self.connection_tables.lock().await;
for (addr, client) in result.iter() {
if addr.contains(namesrv_addr) {
remove_vec.push(addr.clone());
}
}
for addr in &remove_vec {
self.connection_tables.remove(addr);
result.remove(addr);
}
drop(write_guard);
}
}
}
Expand All @@ -167,12 +146,9 @@
request: RemotingCommand,
timeout_millis: u64,
) -> RemotingCommand {
let client = self
.get_and_create_client_mut(addr.clone())
.await
.take()
.unwrap();
ServiceBridge::invoke_sync(client, request, timeout_millis)
let client = self.get_and_create_client(addr.clone()).await;
let client_ref = &mut *client.lock().await;
ServiceBridge::invoke_sync(client_ref, request, timeout_millis)

Check warning on line 151 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L149-L151

Added lines #L149 - L151 were not covered by tests
.await
.unwrap()
}
Expand All @@ -184,12 +160,9 @@
timeout_millis: u64,
invoke_callback: impl InvokeCallback,
) -> Result<(), Box<dyn Error>> {
let client = self
.get_and_create_client_mut(addr.clone())
.await
.take()
.unwrap();
ServiceBridge::invoke_async(client, request, timeout_millis, invoke_callback).await;
let client = self.get_and_create_client(addr.clone()).await;
let client_ref = &mut *client.lock().await;
ServiceBridge::invoke_async(client_ref, request, timeout_millis, invoke_callback).await;
Ok(())
}

Expand Down
Loading