From 8a8e5e7071c055c016499fe09ee54c150860db3e Mon Sep 17 00:00:00 2001 From: mxsm Date: Sat, 1 Jun 2024 09:30:27 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#415]=F0=9F=8E=A8Optmize=20code=20logi?= =?UTF-8?q?c?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/out_api/broker_outer_api.rs | 41 ++-- .../src/topic/manager/topic_config_manager.rs | 191 ++++++++++-------- .../src/clients/rocketmq_default_impl.rs | 21 +- 3 files changed, 133 insertions(+), 120 deletions(-) diff --git a/rocketmq-broker/src/out_api/broker_outer_api.rs b/rocketmq-broker/src/out_api/broker_outer_api.rs index 18b1cc24..5a99cee4 100644 --- a/rocketmq-broker/src/out_api/broker_outer_api.rs +++ b/rocketmq-broker/src/out_api/broker_outer_api.rs @@ -69,6 +69,26 @@ impl BrokerOuterAPI { name_server_address: None, } } + + fn create_request(broker_name: String, topic_config: TopicConfig) -> RemotingCommand { + let request_header = + RegisterTopicRequestHeader::new(topic_config.topic_name.as_ref().unwrap()); + let queue_data = QueueData::new( + broker_name.clone(), + topic_config.read_queue_nums, + topic_config.write_queue_nums, + topic_config.perm, + topic_config.topic_sys_flag, + ); + let topic_route_data = TopicRouteData { + queue_datas: vec![queue_data], + ..Default::default() + }; + let topic_route_body = topic_route_data.encode(); + + RemotingCommand::create_request_command(RequestCode::RegisterTopicInNamesrv, request_header) + .set_body(Some(topic_route_body)) + } } impl BrokerOuterAPI { @@ -184,26 +204,7 @@ impl BrokerOuterAPI { topic_config: TopicConfig, timeout_mills: u64, ) { - let request_header = - RegisterTopicRequestHeader::new(topic_config.topic_name.as_ref().unwrap()); - let queue_data = QueueData::new( - broker_name.clone(), - topic_config.read_queue_nums, - topic_config.write_queue_nums, - topic_config.perm, - topic_config.topic_sys_flag, - ); - let topic_route_data = TopicRouteData { - queue_datas: vec![queue_data], - ..Default::default() - }; - let topic_route_body = topic_route_data.encode(); - - let request = RemotingCommand::create_request_command( - RequestCode::RegisterTopicInNamesrv, - request_header, - ) - .set_body(Some(topic_route_body)); + let request = Self::create_request(broker_name, topic_config); let name_server_address_list = self.remoting_client.get_available_name_srv_list(); let mut handle_vec = Vec::with_capacity(name_server_address_list.len()); for namesrv_addr in name_server_address_list.iter() { diff --git a/rocketmq-broker/src/topic/manager/topic_config_manager.rs b/rocketmq-broker/src/topic/manager/topic_config_manager.rs index 7580b8fd..eb5fdeeb 100644 --- a/rocketmq-broker/src/topic/manager/topic_config_manager.rs +++ b/rocketmq-broker/src/topic/manager/topic_config_manager.rs @@ -262,81 +262,62 @@ impl TopicConfigManager { client_default_topic_queue_nums: i32, topic_sys_flag: u32, ) -> Option { - let mut create_new = false; - let lock = self + let (topic_config, create_new) = if let Some(_lock) = self .topic_config_table_lock - .try_lock_for(Duration::from_secs(3)); - let topic_config = if lock.is_some() { - let mut topic_config = self.get_topic_config(topic); - if topic_config.is_some() { - return topic_config; + .try_lock_for(Duration::from_secs(3)) + { + if let Some(topic_config) = self.get_topic_config(topic) { + return Some(topic_config); } - let mut default_topic_config = self.get_topic_config(default_topic); - let topic_config = if default_topic_config.is_some() { - //default topic + + if let Some(mut default_topic_config) = self.get_topic_config(default_topic) { if default_topic == TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC && !self.broker_config.auto_create_topic_enable { - default_topic_config.as_mut().unwrap().perm = - PermName::PERM_READ | PermName::PERM_WRITE; + default_topic_config.perm = PermName::PERM_READ | PermName::PERM_WRITE; } - if PermName::is_inherited(default_topic_config.as_ref().unwrap().perm) { - topic_config = Some(TopicConfig::new(topic)); - let mut queue_nums = client_default_topic_queue_nums - .min(default_topic_config.as_ref().unwrap().write_queue_nums as i32); - if queue_nums < 0 { - queue_nums = 0; - } - let ref_topic_config = topic_config.as_mut().unwrap(); - ref_topic_config.write_queue_nums = queue_nums as u32; - ref_topic_config.read_queue_nums = queue_nums as u32; - let mut perm = default_topic_config.as_ref().unwrap().perm; - perm &= !PermName::PERM_INHERIT; - ref_topic_config.perm = perm; - ref_topic_config.topic_sys_flag = topic_sys_flag; - ref_topic_config.topic_filter_type = - default_topic_config.as_ref().unwrap().topic_filter_type - } else { - warn!( - "Create new topic failed, because the default topic[{}] has no perm [{}] \ - producer:[{}]", - default_topic, - default_topic_config.as_ref().unwrap().perm, - remote_address - ); - } - - if topic_config.is_some() { + if PermName::is_inherited(default_topic_config.perm) { + let mut topic_config = TopicConfig::new(topic); + let queue_nums = client_default_topic_queue_nums + .min(default_topic_config.write_queue_nums as i32) + .max(0); + topic_config.write_queue_nums = queue_nums as u32; + topic_config.read_queue_nums = queue_nums as u32; + topic_config.perm = default_topic_config.perm & !PermName::PERM_INHERIT; + topic_config.topic_sys_flag = topic_sys_flag; + topic_config.topic_filter_type = default_topic_config.topic_filter_type; info!( "Create new topic by default topic:[{}] config:[{:?}] producer:[{}]", - default_topic, - topic_config.as_ref().unwrap(), - remote_address + default_topic, topic_config, remote_address ); - let _ = self.put_topic_config(topic_config.clone().unwrap()); + self.put_topic_config(topic_config.clone()); self.data_version.lock().next_version_with( self.message_store .as_ref() .unwrap() .get_state_machine_version(), ); - create_new = true; self.persist(); + (Some(topic_config), true) + } else { + warn!( + "Create new topic failed, because the default topic[{}] has no perm [{}] \ + producer:[{}]", + default_topic, default_topic_config.perm, remote_address + ); + (None, false) } - topic_config } else { - None - }; - topic_config + (None, false) + } } else { - None + (None, false) }; - drop(lock); + if create_new { self.register_broker_data(topic_config.as_ref().unwrap()); } - topic_config } @@ -348,52 +329,46 @@ impl TopicConfigManager { is_order: bool, topic_sys_flag: u32, ) -> Option { - let mut topic_config = self.get_topic_config(topic); - if let Some(ref mut config) = topic_config { + if let Some(ref mut config) = self.get_topic_config(topic) { if is_order != config.order { config.order = is_order; self.update_topic_config(config); } - return topic_config; + return Some(config.clone()); } - let mut create_new = false; - let lock = self + let (topic_config, create_new) = if let Some(_lock) = self .topic_config_table_lock - .try_lock_for(Duration::from_secs(3)); - let topic_config_result = if lock.is_some() { - topic_config = self.get_topic_config(topic); - if topic_config.is_some() { - return topic_config; - } - topic_config = Some(TopicConfig::new(topic)); - if let Some(ref mut config) = topic_config { - config.read_queue_nums = client_default_topic_queue_nums as u32; - config.write_queue_nums = client_default_topic_queue_nums as u32; - config.perm = perm; - config.topic_sys_flag = topic_sys_flag; - config.order = is_order; - info!("create new topic {:?}", config); - self.put_topic_config(config.clone()); - create_new = true; - self.data_version.lock().next_version_with( - self.message_store - .as_ref() - .unwrap() - .get_state_machine_version(), - ); - self.persist(); + .try_lock_for(Duration::from_secs(3)) + { + if let Some(config) = self.get_topic_config(topic) { + return Some(config); } - topic_config + + let mut config = TopicConfig::new(topic); + config.read_queue_nums = client_default_topic_queue_nums as u32; + config.write_queue_nums = client_default_topic_queue_nums as u32; + config.perm = perm; + config.topic_sys_flag = topic_sys_flag; + config.order = is_order; + + self.put_topic_config(config.clone()); + self.data_version.lock().next_version_with( + self.message_store + .as_ref() + .unwrap() + .get_state_machine_version(), + ); + self.persist(); + (Some(config), true) } else { - None + (None, false) }; - drop(lock); + if create_new { - self.register_broker_data(topic_config_result.as_ref().unwrap()); + self.register_broker_data(topic_config.as_ref().unwrap()); } - - topic_config_result + topic_config } fn register_broker_data(&mut self, topic_config: &TopicConfig) { @@ -471,6 +446,52 @@ impl TopicConfigManager { pub fn set_message_store(&mut self, message_store: Option) { self.message_store = message_store; } + + pub fn create_topic_of_tran_check_max_time( + &mut self, + client_default_topic_queue_nums: i32, + perm: u32, + ) -> Option { + if let Some(ref mut config) = + self.get_topic_config(TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC) + { + return Some(config.clone()); + } + + let (topic_config, create_new) = if let Some(_lock) = self + .topic_config_table_lock + .try_lock_for(Duration::from_secs(3)) + { + if let Some(config) = + self.get_topic_config(TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC) + { + return Some(config); + } + + let mut config = TopicConfig::new(TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); + config.read_queue_nums = client_default_topic_queue_nums as u32; + config.write_queue_nums = client_default_topic_queue_nums as u32; + config.perm = perm; + config.topic_sys_flag = 0; + info!("create new topic {:?}", config); + self.put_topic_config(config.clone()); + self.data_version.lock().next_version_with( + self.message_store + .as_ref() + .unwrap() + .get_state_machine_version(), + ); + self.persist(); + (Some(config), true) + } else { + (None, false) + }; + + if create_new { + self.register_broker_data(topic_config.as_ref().unwrap()); + } + topic_config + } } //Fully implemented will be removed diff --git a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs index a65410c0..3dfd1efa 100644 --- a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs +++ b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs @@ -176,24 +176,15 @@ impl RemotingClient for RocketmqDefaultClient { timeout_millis: u64, ) -> Result { let client = self.get_and_create_client(addr.clone()); - /*if let Ok(resp) = time::timeout(Duration::from_millis(timeout_millis), async { - client.lock().await.send_read(request).await.unwrap() - }) - .await*/ - - match tokio::spawn(async move { - match time::timeout(Duration::from_millis(timeout_millis), async move { - client.lock().await.send_read(request).await.unwrap() - }) - .await - { - Ok(result) => Ok(result), - Err(err) => Err(RemotingError::RemoteException(err.to_string())), - } + match time::timeout(Duration::from_millis(timeout_millis), async move { + client.lock().await.send_read(request).await }) .await { - Ok(result) => result, + Ok(result) => match result { + Ok(response) => Ok(response), + Err(err) => Err(RemotingError::RemoteException(err.to_string())), + }, Err(err) => Err(RemotingError::RemoteException(err.to_string())), } }