Skip to content

[ISSUE #263]🚧Support register borker(request code:103)-1 #264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions rocketmq-broker/src/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ pub struct BrokerConfig {
pub broker_permission: i8,
pub async_send_enable: bool,
pub store_path_root_dir: String,
pub enable_split_registration: bool,
pub split_registration_size: i32,
pub register_broker_timeout_mills: i32,
pub is_in_broker_container: bool,
}

impl Default for BrokerConfig {
Expand Down Expand Up @@ -80,6 +84,10 @@ impl Default for BrokerConfig {
.join("store")
.to_string_lossy()
.into_owned(),
enable_split_registration: false,
split_registration_size: 800,
register_broker_timeout_mills: 24000,
is_in_broker_container: false,
}
}
}
Expand Down
110 changes: 108 additions & 2 deletions rocketmq-broker/src/broker_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::{net::SocketAddr, sync::Arc};
use std::{collections::HashMap, net::SocketAddr, sync::Arc};

use rocketmq_common::{common::config_manager::ConfigManager, TokioExecutorService};
use rocketmq_common::{
common::{config::TopicConfig, config_manager::ConfigManager, constant::PermName},
TokioExecutorService,
};
use rocketmq_remoting::{
code::request_code::RequestCode,
protocol::{
body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper,
static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail,
},
remoting::RemotingService,
server::{rocketmq_server::RocketmqDefaultServer, RemotingServer},
};
Expand Down Expand Up @@ -160,6 +167,7 @@ impl BrokerController {
}
}

#[allow(unused_variables)]
impl BrokerController {
pub async fn start(&mut self) {
if self.message_store.as_mut().is_some() {
Expand Down Expand Up @@ -278,6 +286,104 @@ impl BrokerController {
fn initial_acl(&mut self) {}

fn initial_rpc_hooks(&mut self) {}

fn register_broker_all(
&mut self,
check_order_config: bool,
oneway: bool,
force_register: bool,
) {
let mut topic_config_table = HashMap::new();
for topic_config in self.topic_config_manager_inner.topic_config_table.values() {
let new_topic_config = if !PermName::is_writeable(self.broker_config.broker_permission)
|| !PermName::is_readable(self.broker_config.broker_permission)
{
TopicConfig {
topic_name: topic_config.topic_name.clone(),
read_queue_nums: topic_config.read_queue_nums,
write_queue_nums: topic_config.write_queue_nums,
perm: topic_config.perm & self.broker_config.broker_permission as u32,
..TopicConfig::default()
}
} else {
topic_config.clone()
};
topic_config_table.insert(new_topic_config.topic_name.clone(), new_topic_config);
}

// Handle split registration logic
if self.broker_config.enable_split_registration
&& topic_config_table.len() as i32 >= self.broker_config.split_registration_size
{
let topic_config_wrapper = self
.topic_config_manager_inner
.build_serialize_wrapper(topic_config_table.clone());
self.do_register_broker_all(check_order_config, oneway, topic_config_wrapper);
}

// Collect topicQueueMappingInfoMap
let topic_queue_mapping_info_map = self
.topic_queue_mapping_manager
.topic_queue_mapping_table
.iter()
.map(|(key, value)| {
(
key.clone(),
TopicQueueMappingDetail::clone_as_mapping_info(value),
)
})
.collect();

let topic_config_wrapper = self
.topic_config_manager_inner
.build_serialize_wrapper_with_topic_queue_map(
topic_config_table,
topic_queue_mapping_info_map,
);

if self.broker_config.enable_split_registration
|| force_register
|| self.need_register(
self.broker_config
.broker_identity
.broker_cluster_name
.clone()
.as_str(),
self.broker_config.broker_ip1.clone().as_str(),
self.broker_config
.broker_identity
.broker_name
.clone()
.as_str(),
self.broker_config.broker_identity.broker_id,
self.broker_config.register_broker_timeout_mills,
self.broker_config.is_in_broker_container,
)
{
self.do_register_broker_all(check_order_config, oneway, topic_config_wrapper);
}
}

fn need_register(
&mut self,
cluster_name: &str,
broker_addr: &str,
broker_name: &str,
broker_id: u64,
register_timeout_mills: i32,
in_broker_container: bool,
) -> bool {
unimplemented!()
}

fn do_register_broker_all(
&mut self,
check_order_config: bool,
oneway: bool,
topic_config_wrapper: TopicConfigAndMappingSerializeWrapper,
) {
unimplemented!()
}
}

impl Drop for BrokerController {
Expand Down
28 changes: 27 additions & 1 deletion rocketmq-broker/src/topic/manager/topic_config_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use rocketmq_common::common::{
topic::TopicValidator,
};
use rocketmq_remoting::protocol::{
body::topic_info_wrapper::TopicConfigSerializeWrapper, DataVersion,
body::topic_info_wrapper::{
topic_config_wrapper::TopicConfigAndMappingSerializeWrapper, TopicConfigSerializeWrapper,
},
static_topic::topic_queue_info::TopicQueueMappingInfo,
DataVersion,
};
use tracing::info;

Expand Down Expand Up @@ -180,6 +184,28 @@ impl TopicConfigManager {
pub fn select_topic_config(&self, topic: &str) -> Option<TopicConfig> {
self.topic_config_table.get(topic).cloned()
}

pub fn build_serialize_wrapper(
&mut self,
topic_config_table: HashMap<String, TopicConfig>,
) -> TopicConfigAndMappingSerializeWrapper {
self.build_serialize_wrapper_with_topic_queue_map(topic_config_table, HashMap::new())
}

pub fn build_serialize_wrapper_with_topic_queue_map(
&mut self,
topic_config_table: HashMap<String, TopicConfig>,
topic_queue_mapping_info_map: HashMap<String, TopicQueueMappingInfo>,
) -> TopicConfigAndMappingSerializeWrapper {
if self.broker_config.enable_split_registration {
self.data_version.next_version();
}
TopicConfigAndMappingSerializeWrapper {
topic_config_table: Some(topic_config_table),
topic_queue_mapping_info_map,
..TopicConfigAndMappingSerializeWrapper::default()
}
}
}

//Fully implemented will be removed
Expand Down
16 changes: 15 additions & 1 deletion rocketmq-remoting/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use std::{
time::SystemTime,
};

use rocketmq_common::common::{mix_all, topic::TopicValidator};
use rocketmq_common::{
common::{mix_all, topic::TopicValidator},
utils::time_utils,
};
use serde::{de, Deserialize, Serialize};

use crate::RocketMQSerializable;
Expand Down Expand Up @@ -258,6 +261,17 @@ impl DataVersion {
pub fn counter_inner(&self) -> &AtomicI64 {
&self.counter_inner
}

pub fn next_version(&mut self) {
self.next_version_with(0)
}

pub fn next_version_with(&mut self, state_version: i64) {
self.timestamp = time_utils::get_current_millis() as i64;
self.state_version = state_version;
self.counter_inner.fetch_add(1, Ordering::SeqCst);
self.counter = self.counter_inner.load(Ordering::Relaxed)
}
}

impl Display for DataVersion {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ use crate::protocol::{
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicConfigAndMappingSerializeWrapper {
#[serde(rename = "topicQueueMappingInfoMap")]
topic_queue_mapping_info_map: HashMap<String /* topic */, TopicQueueMappingInfo>,
pub topic_queue_mapping_info_map: HashMap<String /* topic */, TopicQueueMappingInfo>,
#[serde(rename = "topicQueueMappingDetailMap")]
topic_queue_mapping_detail_map: HashMap<String /* topic */, TopicQueueMappingDetail>,
pub topic_queue_mapping_detail_map: HashMap<String /* topic */, TopicQueueMappingDetail>,
#[serde(rename = "mappingDataVersion")]
mapping_data_version: DataVersion,
pub mapping_data_version: DataVersion,

#[serde(rename = "topicConfigTable")]
topic_config_table: Option<HashMap<String, TopicConfig>>,
pub topic_config_table: Option<HashMap<String, TopicConfig>>,

#[serde(rename = "dataVersion")]
data_version: Option<DataVersion>,
pub data_version: Option<DataVersion>,
}

impl TopicConfigAndMappingSerializeWrapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,18 @@ impl TopicQueueMappingDetail {
None
}
}

//impl static methods(Like java static method)
impl TopicQueueMappingDetail {
pub fn clone_as_mapping_info(
mapping_detail: &TopicQueueMappingDetail,
) -> TopicQueueMappingInfo {
TopicQueueMappingInfo {
topic: mapping_detail.topic_queue_mapping_info.topic.clone(),
total_queues: mapping_detail.topic_queue_mapping_info.total_queues,
bname: mapping_detail.topic_queue_mapping_info.bname.clone(),
epoch: mapping_detail.topic_queue_mapping_info.epoch,
..TopicQueueMappingInfo::default()
}
}
}