diff --git a/rocketmq-broker/src/broker_config.rs b/rocketmq-broker/src/broker_config.rs index 0ac14d44..cdbd7c31 100644 --- a/rocketmq-broker/src/broker_config.rs +++ b/rocketmq-broker/src/broker_config.rs @@ -49,6 +49,10 @@ pub struct BrokerConfig { pub broker_permission: i8, pub async_send_enable: bool, pub store_path_root_dir: String, + pub enable_split_registration: bool, + pub split_registration_size: i32, + pub register_broker_timeout_mills: i32, + pub is_in_broker_container: bool, } impl Default for BrokerConfig { @@ -80,6 +84,10 @@ impl Default for BrokerConfig { .join("store") .to_string_lossy() .into_owned(), + enable_split_registration: false, + split_registration_size: 800, + register_broker_timeout_mills: 24000, + is_in_broker_container: false, } } } diff --git a/rocketmq-broker/src/broker_controller.rs b/rocketmq-broker/src/broker_controller.rs index fd4d3d98..caa33e03 100644 --- a/rocketmq-broker/src/broker_controller.rs +++ b/rocketmq-broker/src/broker_controller.rs @@ -14,11 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use std::{net::SocketAddr, sync::Arc}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; -use rocketmq_common::{common::config_manager::ConfigManager, TokioExecutorService}; +use rocketmq_common::{ + common::{config::TopicConfig, config_manager::ConfigManager, constant::PermName}, + TokioExecutorService, +}; use rocketmq_remoting::{ code::request_code::RequestCode, + protocol::{ + body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper, + static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail, + }, remoting::RemotingService, server::{rocketmq_server::RocketmqDefaultServer, RemotingServer}, }; @@ -160,6 +167,7 @@ impl BrokerController { } } +#[allow(unused_variables)] impl BrokerController { pub async fn start(&mut self) { if self.message_store.as_mut().is_some() { @@ -278,6 +286,104 @@ impl BrokerController { fn initial_acl(&mut self) {} fn initial_rpc_hooks(&mut self) {} + + fn register_broker_all( + &mut self, + check_order_config: bool, + oneway: bool, + force_register: bool, + ) { + let mut topic_config_table = HashMap::new(); + for topic_config in self.topic_config_manager_inner.topic_config_table.values() { + let new_topic_config = if !PermName::is_writeable(self.broker_config.broker_permission) + || !PermName::is_readable(self.broker_config.broker_permission) + { + TopicConfig { + topic_name: topic_config.topic_name.clone(), + read_queue_nums: topic_config.read_queue_nums, + write_queue_nums: topic_config.write_queue_nums, + perm: topic_config.perm & self.broker_config.broker_permission as u32, + ..TopicConfig::default() + } + } else { + topic_config.clone() + }; + topic_config_table.insert(new_topic_config.topic_name.clone(), new_topic_config); + } + + // Handle split registration logic + if self.broker_config.enable_split_registration + && topic_config_table.len() as i32 >= self.broker_config.split_registration_size + { + let topic_config_wrapper = self + .topic_config_manager_inner + .build_serialize_wrapper(topic_config_table.clone()); + self.do_register_broker_all(check_order_config, oneway, topic_config_wrapper); + } + + // Collect topicQueueMappingInfoMap + let topic_queue_mapping_info_map = self + .topic_queue_mapping_manager + .topic_queue_mapping_table + .iter() + .map(|(key, value)| { + ( + key.clone(), + TopicQueueMappingDetail::clone_as_mapping_info(value), + ) + }) + .collect(); + + let topic_config_wrapper = self + .topic_config_manager_inner + .build_serialize_wrapper_with_topic_queue_map( + topic_config_table, + topic_queue_mapping_info_map, + ); + + if self.broker_config.enable_split_registration + || force_register + || self.need_register( + self.broker_config + .broker_identity + .broker_cluster_name + .clone() + .as_str(), + self.broker_config.broker_ip1.clone().as_str(), + self.broker_config + .broker_identity + .broker_name + .clone() + .as_str(), + self.broker_config.broker_identity.broker_id, + self.broker_config.register_broker_timeout_mills, + self.broker_config.is_in_broker_container, + ) + { + self.do_register_broker_all(check_order_config, oneway, topic_config_wrapper); + } + } + + fn need_register( + &mut self, + cluster_name: &str, + broker_addr: &str, + broker_name: &str, + broker_id: u64, + register_timeout_mills: i32, + in_broker_container: bool, + ) -> bool { + unimplemented!() + } + + fn do_register_broker_all( + &mut self, + check_order_config: bool, + oneway: bool, + topic_config_wrapper: TopicConfigAndMappingSerializeWrapper, + ) { + unimplemented!() + } } impl Drop for BrokerController { diff --git a/rocketmq-broker/src/topic/manager/topic_config_manager.rs b/rocketmq-broker/src/topic/manager/topic_config_manager.rs index 70cef8f2..94ff23ff 100644 --- a/rocketmq-broker/src/topic/manager/topic_config_manager.rs +++ b/rocketmq-broker/src/topic/manager/topic_config_manager.rs @@ -22,7 +22,11 @@ use rocketmq_common::common::{ topic::TopicValidator, }; use rocketmq_remoting::protocol::{ - body::topic_info_wrapper::TopicConfigSerializeWrapper, DataVersion, + body::topic_info_wrapper::{ + topic_config_wrapper::TopicConfigAndMappingSerializeWrapper, TopicConfigSerializeWrapper, + }, + static_topic::topic_queue_info::TopicQueueMappingInfo, + DataVersion, }; use tracing::info; @@ -180,6 +184,28 @@ impl TopicConfigManager { pub fn select_topic_config(&self, topic: &str) -> Option { self.topic_config_table.get(topic).cloned() } + + pub fn build_serialize_wrapper( + &mut self, + topic_config_table: HashMap, + ) -> TopicConfigAndMappingSerializeWrapper { + self.build_serialize_wrapper_with_topic_queue_map(topic_config_table, HashMap::new()) + } + + pub fn build_serialize_wrapper_with_topic_queue_map( + &mut self, + topic_config_table: HashMap, + topic_queue_mapping_info_map: HashMap, + ) -> TopicConfigAndMappingSerializeWrapper { + if self.broker_config.enable_split_registration { + self.data_version.next_version(); + } + TopicConfigAndMappingSerializeWrapper { + topic_config_table: Some(topic_config_table), + topic_queue_mapping_info_map, + ..TopicConfigAndMappingSerializeWrapper::default() + } + } } //Fully implemented will be removed diff --git a/rocketmq-remoting/src/protocol.rs b/rocketmq-remoting/src/protocol.rs index f71c5643..3c6e19ae 100644 --- a/rocketmq-remoting/src/protocol.rs +++ b/rocketmq-remoting/src/protocol.rs @@ -21,7 +21,10 @@ use std::{ time::SystemTime, }; -use rocketmq_common::common::{mix_all, topic::TopicValidator}; +use rocketmq_common::{ + common::{mix_all, topic::TopicValidator}, + utils::time_utils, +}; use serde::{de, Deserialize, Serialize}; use crate::RocketMQSerializable; @@ -258,6 +261,17 @@ impl DataVersion { pub fn counter_inner(&self) -> &AtomicI64 { &self.counter_inner } + + pub fn next_version(&mut self) { + self.next_version_with(0) + } + + pub fn next_version_with(&mut self, state_version: i64) { + self.timestamp = time_utils::get_current_millis() as i64; + self.state_version = state_version; + self.counter_inner.fetch_add(1, Ordering::SeqCst); + self.counter = self.counter_inner.load(Ordering::Relaxed) + } } impl Display for DataVersion { diff --git a/rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs b/rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs index 562b316c..e38209f4 100644 --- a/rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs +++ b/rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs @@ -30,17 +30,17 @@ use crate::protocol::{ #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TopicConfigAndMappingSerializeWrapper { #[serde(rename = "topicQueueMappingInfoMap")] - topic_queue_mapping_info_map: HashMap, + pub topic_queue_mapping_info_map: HashMap, #[serde(rename = "topicQueueMappingDetailMap")] - topic_queue_mapping_detail_map: HashMap, + pub topic_queue_mapping_detail_map: HashMap, #[serde(rename = "mappingDataVersion")] - mapping_data_version: DataVersion, + pub mapping_data_version: DataVersion, #[serde(rename = "topicConfigTable")] - topic_config_table: Option>, + pub topic_config_table: Option>, #[serde(rename = "dataVersion")] - data_version: Option, + pub data_version: Option, } impl TopicConfigAndMappingSerializeWrapper { diff --git a/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs b/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs index 4b686a84..614133e7 100644 --- a/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs +++ b/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs @@ -27,3 +27,18 @@ impl TopicQueueMappingDetail { None } } + +//impl static methods(Like java static method) +impl TopicQueueMappingDetail { + pub fn clone_as_mapping_info( + mapping_detail: &TopicQueueMappingDetail, + ) -> TopicQueueMappingInfo { + TopicQueueMappingInfo { + topic: mapping_detail.topic_queue_mapping_info.topic.clone(), + total_queues: mapping_detail.topic_queue_mapping_info.total_queues, + bname: mapping_detail.topic_queue_mapping_info.bname.clone(), + epoch: mapping_detail.topic_queue_mapping_info.epoch, + ..TopicQueueMappingInfo::default() + } + } +}