Skip to content

Commit fecc8c7

Browse files
authored
[ISSUE #263]🚧Support register borker(request code:103)-1 (#264)
1 parent 62973ca commit fecc8c7

File tree

6 files changed

+178
-9
lines changed

6 files changed

+178
-9
lines changed

rocketmq-broker/src/broker_config.rs

+8
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ pub struct BrokerConfig {
4949
pub broker_permission: i8,
5050
pub async_send_enable: bool,
5151
pub store_path_root_dir: String,
52+
pub enable_split_registration: bool,
53+
pub split_registration_size: i32,
54+
pub register_broker_timeout_mills: i32,
55+
pub is_in_broker_container: bool,
5256
}
5357

5458
impl Default for BrokerConfig {
@@ -80,6 +84,10 @@ impl Default for BrokerConfig {
8084
.join("store")
8185
.to_string_lossy()
8286
.into_owned(),
87+
enable_split_registration: false,
88+
split_registration_size: 800,
89+
register_broker_timeout_mills: 24000,
90+
is_in_broker_container: false,
8391
}
8492
}
8593
}

rocketmq-broker/src/broker_controller.rs

+108-2
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,18 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use std::{net::SocketAddr, sync::Arc};
17+
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
1818

19-
use rocketmq_common::{common::config_manager::ConfigManager, TokioExecutorService};
19+
use rocketmq_common::{
20+
common::{config::TopicConfig, config_manager::ConfigManager, constant::PermName},
21+
TokioExecutorService,
22+
};
2023
use rocketmq_remoting::{
2124
code::request_code::RequestCode,
25+
protocol::{
26+
body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper,
27+
static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail,
28+
},
2229
remoting::RemotingService,
2330
server::{rocketmq_server::RocketmqDefaultServer, RemotingServer},
2431
};
@@ -160,6 +167,7 @@ impl BrokerController {
160167
}
161168
}
162169

170+
#[allow(unused_variables)]
163171
impl BrokerController {
164172
pub async fn start(&mut self) {
165173
if self.message_store.as_mut().is_some() {
@@ -278,6 +286,104 @@ impl BrokerController {
278286
fn initial_acl(&mut self) {}
279287

280288
fn initial_rpc_hooks(&mut self) {}
289+
290+
fn register_broker_all(
291+
&mut self,
292+
check_order_config: bool,
293+
oneway: bool,
294+
force_register: bool,
295+
) {
296+
let mut topic_config_table = HashMap::new();
297+
for topic_config in self.topic_config_manager_inner.topic_config_table.values() {
298+
let new_topic_config = if !PermName::is_writeable(self.broker_config.broker_permission)
299+
|| !PermName::is_readable(self.broker_config.broker_permission)
300+
{
301+
TopicConfig {
302+
topic_name: topic_config.topic_name.clone(),
303+
read_queue_nums: topic_config.read_queue_nums,
304+
write_queue_nums: topic_config.write_queue_nums,
305+
perm: topic_config.perm & self.broker_config.broker_permission as u32,
306+
..TopicConfig::default()
307+
}
308+
} else {
309+
topic_config.clone()
310+
};
311+
topic_config_table.insert(new_topic_config.topic_name.clone(), new_topic_config);
312+
}
313+
314+
// Handle split registration logic
315+
if self.broker_config.enable_split_registration
316+
&& topic_config_table.len() as i32 >= self.broker_config.split_registration_size
317+
{
318+
let topic_config_wrapper = self
319+
.topic_config_manager_inner
320+
.build_serialize_wrapper(topic_config_table.clone());
321+
self.do_register_broker_all(check_order_config, oneway, topic_config_wrapper);
322+
}
323+
324+
// Collect topicQueueMappingInfoMap
325+
let topic_queue_mapping_info_map = self
326+
.topic_queue_mapping_manager
327+
.topic_queue_mapping_table
328+
.iter()
329+
.map(|(key, value)| {
330+
(
331+
key.clone(),
332+
TopicQueueMappingDetail::clone_as_mapping_info(value),
333+
)
334+
})
335+
.collect();
336+
337+
let topic_config_wrapper = self
338+
.topic_config_manager_inner
339+
.build_serialize_wrapper_with_topic_queue_map(
340+
topic_config_table,
341+
topic_queue_mapping_info_map,
342+
);
343+
344+
if self.broker_config.enable_split_registration
345+
|| force_register
346+
|| self.need_register(
347+
self.broker_config
348+
.broker_identity
349+
.broker_cluster_name
350+
.clone()
351+
.as_str(),
352+
self.broker_config.broker_ip1.clone().as_str(),
353+
self.broker_config
354+
.broker_identity
355+
.broker_name
356+
.clone()
357+
.as_str(),
358+
self.broker_config.broker_identity.broker_id,
359+
self.broker_config.register_broker_timeout_mills,
360+
self.broker_config.is_in_broker_container,
361+
)
362+
{
363+
self.do_register_broker_all(check_order_config, oneway, topic_config_wrapper);
364+
}
365+
}
366+
367+
fn need_register(
368+
&mut self,
369+
cluster_name: &str,
370+
broker_addr: &str,
371+
broker_name: &str,
372+
broker_id: u64,
373+
register_timeout_mills: i32,
374+
in_broker_container: bool,
375+
) -> bool {
376+
unimplemented!()
377+
}
378+
379+
fn do_register_broker_all(
380+
&mut self,
381+
check_order_config: bool,
382+
oneway: bool,
383+
topic_config_wrapper: TopicConfigAndMappingSerializeWrapper,
384+
) {
385+
unimplemented!()
386+
}
281387
}
282388

283389
impl Drop for BrokerController {

rocketmq-broker/src/topic/manager/topic_config_manager.rs

+27-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ use rocketmq_common::common::{
2222
topic::TopicValidator,
2323
};
2424
use rocketmq_remoting::protocol::{
25-
body::topic_info_wrapper::TopicConfigSerializeWrapper, DataVersion,
25+
body::topic_info_wrapper::{
26+
topic_config_wrapper::TopicConfigAndMappingSerializeWrapper, TopicConfigSerializeWrapper,
27+
},
28+
static_topic::topic_queue_info::TopicQueueMappingInfo,
29+
DataVersion,
2630
};
2731
use tracing::info;
2832

@@ -180,6 +184,28 @@ impl TopicConfigManager {
180184
pub fn select_topic_config(&self, topic: &str) -> Option<TopicConfig> {
181185
self.topic_config_table.get(topic).cloned()
182186
}
187+
188+
pub fn build_serialize_wrapper(
189+
&mut self,
190+
topic_config_table: HashMap<String, TopicConfig>,
191+
) -> TopicConfigAndMappingSerializeWrapper {
192+
self.build_serialize_wrapper_with_topic_queue_map(topic_config_table, HashMap::new())
193+
}
194+
195+
pub fn build_serialize_wrapper_with_topic_queue_map(
196+
&mut self,
197+
topic_config_table: HashMap<String, TopicConfig>,
198+
topic_queue_mapping_info_map: HashMap<String, TopicQueueMappingInfo>,
199+
) -> TopicConfigAndMappingSerializeWrapper {
200+
if self.broker_config.enable_split_registration {
201+
self.data_version.next_version();
202+
}
203+
TopicConfigAndMappingSerializeWrapper {
204+
topic_config_table: Some(topic_config_table),
205+
topic_queue_mapping_info_map,
206+
..TopicConfigAndMappingSerializeWrapper::default()
207+
}
208+
}
183209
}
184210

185211
//Fully implemented will be removed

rocketmq-remoting/src/protocol.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ use std::{
2121
time::SystemTime,
2222
};
2323

24-
use rocketmq_common::common::{mix_all, topic::TopicValidator};
24+
use rocketmq_common::{
25+
common::{mix_all, topic::TopicValidator},
26+
utils::time_utils,
27+
};
2528
use serde::{de, Deserialize, Serialize};
2629

2730
use crate::RocketMQSerializable;
@@ -258,6 +261,17 @@ impl DataVersion {
258261
pub fn counter_inner(&self) -> &AtomicI64 {
259262
&self.counter_inner
260263
}
264+
265+
pub fn next_version(&mut self) {
266+
self.next_version_with(0)
267+
}
268+
269+
pub fn next_version_with(&mut self, state_version: i64) {
270+
self.timestamp = time_utils::get_current_millis() as i64;
271+
self.state_version = state_version;
272+
self.counter_inner.fetch_add(1, Ordering::SeqCst);
273+
self.counter = self.counter_inner.load(Ordering::Relaxed)
274+
}
261275
}
262276

263277
impl Display for DataVersion {

rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@ use crate::protocol::{
3030
#[derive(Debug, Clone, Serialize, Deserialize)]
3131
pub struct TopicConfigAndMappingSerializeWrapper {
3232
#[serde(rename = "topicQueueMappingInfoMap")]
33-
topic_queue_mapping_info_map: HashMap<String /* topic */, TopicQueueMappingInfo>,
33+
pub topic_queue_mapping_info_map: HashMap<String /* topic */, TopicQueueMappingInfo>,
3434
#[serde(rename = "topicQueueMappingDetailMap")]
35-
topic_queue_mapping_detail_map: HashMap<String /* topic */, TopicQueueMappingDetail>,
35+
pub topic_queue_mapping_detail_map: HashMap<String /* topic */, TopicQueueMappingDetail>,
3636
#[serde(rename = "mappingDataVersion")]
37-
mapping_data_version: DataVersion,
37+
pub mapping_data_version: DataVersion,
3838

3939
#[serde(rename = "topicConfigTable")]
40-
topic_config_table: Option<HashMap<String, TopicConfig>>,
40+
pub topic_config_table: Option<HashMap<String, TopicConfig>>,
4141

4242
#[serde(rename = "dataVersion")]
43-
data_version: Option<DataVersion>,
43+
pub data_version: Option<DataVersion>,
4444
}
4545

4646
impl TopicConfigAndMappingSerializeWrapper {

rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs

+15
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,18 @@ impl TopicQueueMappingDetail {
2727
None
2828
}
2929
}
30+
31+
//impl static methods(Like java static method)
32+
impl TopicQueueMappingDetail {
33+
pub fn clone_as_mapping_info(
34+
mapping_detail: &TopicQueueMappingDetail,
35+
) -> TopicQueueMappingInfo {
36+
TopicQueueMappingInfo {
37+
topic: mapping_detail.topic_queue_mapping_info.topic.clone(),
38+
total_queues: mapping_detail.topic_queue_mapping_info.total_queues,
39+
bname: mapping_detail.topic_queue_mapping_info.bname.clone(),
40+
epoch: mapping_detail.topic_queue_mapping_info.epoch,
41+
..TopicQueueMappingInfo::default()
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)