Skip to content

Commit d325df6

Browse files
committed
[ISSUE #53]Perfect support broker register(request code 103)
1 parent 3376402 commit d325df6

16 files changed

+597
-50
lines changed

rocketmq-namesrv/src/bin/bootstrap_server.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,7 @@ fn init_processors(
8484
kvconfig_manager_inner.clone(),
8585
)),
8686
);
87-
(
88-
processors,
89-
DefaultRequestProcessor::new(namesrv_config.clone()),
90-
)
87+
(processors, DefaultRequestProcessor::new(namesrv_config))
9188
}
9289

9390
#[derive(Parser, Debug)]

rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use tracing::info;
2222

2323
use crate::kvconfig::KVConfigSerializeWrapper;
2424

25-
#[derive(Debug)]
25+
#[derive(Debug, Clone)]
2626
pub struct KVConfigManager {
2727
pub(crate) config_table:
2828
HashMap<String /* Namespace */, HashMap<String /* Key */, String /* Value */>>,

rocketmq-namesrv/src/processor/default_request_processor.rs

+26-6
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ use rocketmq_remoting::{
2929
broker_body::register_broker_body::RegisterBrokerBody,
3030
topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper,
3131
},
32-
header::{
33-
broker_request_header::RegisterBrokerRequestHeader,
34-
namesrv::kv_config_request_header::PutKVConfigRequestHeader,
32+
header::namesrv::{
33+
kv_config_request_header::PutKVConfigRequestHeader,
34+
register_broker_header::{RegisterBrokerRequestHeader, RegisterBrokerResponseHeader},
3535
},
3636
remoting_command::RemotingCommand,
3737
RemotingSerializable,
@@ -123,7 +123,7 @@ impl DefaultRequestProcessor {
123123
.set_remark(Some(String::from("crc32 not match")));
124124
}
125125

126-
let response_command = RemotingCommand::create_response_command();
126+
let mut response_command = RemotingCommand::create_response_command();
127127
let broker_version = RocketMqVersion::try_from(request.version()).unwrap();
128128
let topic_config_wrapper;
129129
let mut filter_server_list = Vec::<String>::new();
@@ -159,10 +159,30 @@ impl DefaultRequestProcessor {
159159
);
160160
if result.is_none() {
161161
return response_command
162-
.set_code(RemotingSysResponseCode::SystemError as i32)
162+
.set_code(RemotingSysResponseCode::SystemError)
163163
.set_remark(Some(String::from("register broker failed")));
164164
}
165-
response_command.set_code(RemotingSysResponseCode::Success)
165+
if self
166+
.kvconfig_manager
167+
.read()
168+
.namesrv_config
169+
.return_order_topic_config_to_broker
170+
{
171+
if let Some(value) = self
172+
.kvconfig_manager
173+
.write()
174+
.get_kv_list_by_namespace("ORDER_TOPIC_CONFIG")
175+
{
176+
response_command = response_command.set_body(Some(Bytes::from(value)));
177+
}
178+
}
179+
let register_broker_result = result.unwrap();
180+
response_command
181+
.set_code(RemotingSysResponseCode::Success)
182+
.set_command_custom_header(Some(Box::new(RegisterBrokerResponseHeader::new(
183+
Some(register_broker_result.ha_server_addr),
184+
Some(register_broker_result.master_addr),
185+
))))
166186
}
167187
}
168188

rocketmq-namesrv/src/route/route_info_manager.rs

+117-30
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,28 @@
1717

1818
use std::{
1919
collections::{HashMap, HashSet},
20-
time::SystemTime,
20+
time::{Duration, SystemTime},
2121
};
2222

2323
use rocketmq_common::common::{
2424
config::TopicConfig, constant::PermName, mix_all, namesrv::namesrv_config::NamesrvConfig,
2525
topic::TopicValidator,
2626
};
27-
use rocketmq_remoting::protocol::{
28-
body::{
29-
broker_body::cluster_info::ClusterInfo,
30-
topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper,
27+
use rocketmq_remoting::{
28+
clients::RemoteClient,
29+
code::request_code::RequestCode,
30+
protocol::{
31+
body::{
32+
broker_body::cluster_info::ClusterInfo,
33+
topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper,
34+
},
35+
header::namesrv::brokerid_change_request_header::NotifyMinBrokerIdChangeRequestHeader,
36+
namesrv::RegisterBrokerResult,
37+
remoting_command::RemotingCommand,
38+
route::route_data_view::{BrokerData, QueueData, TopicRouteData},
39+
static_topic::topic_queue_info::TopicQueueMappingInfo,
40+
DataVersion,
3141
},
32-
namesrv::RegisterBrokerResult,
33-
route::route_data_view::{BrokerData, QueueData, TopicRouteData},
34-
static_topic::topic_queue_info::TopicQueueMappingInfo,
35-
DataVersion,
3642
};
3743
use tracing::{debug, info, warn};
3844

@@ -52,13 +58,14 @@ type TopicQueueMappingInfoTable =
5258

5359
#[derive(Debug, Clone, Default)]
5460
pub struct RouteInfoManager {
55-
topic_queue_table: TopicQueueTable,
56-
broker_addr_table: BrokerAddrTable,
57-
cluster_addr_table: ClusterAddrTable,
58-
broker_live_table: BrokerLiveTable,
59-
filter_server_table: FilterServerTable,
60-
topic_queue_mapping_info_table: TopicQueueMappingInfoTable,
61-
namesrv_config: NamesrvConfig,
61+
pub(crate) topic_queue_table: TopicQueueTable,
62+
pub(crate) broker_addr_table: BrokerAddrTable,
63+
pub(crate) cluster_addr_table: ClusterAddrTable,
64+
pub(crate) broker_live_table: BrokerLiveTable,
65+
pub(crate) filter_server_table: FilterServerTable,
66+
pub(crate) topic_queue_mapping_info_table: TopicQueueMappingInfoTable,
67+
pub(crate) namesrv_config: NamesrvConfig,
68+
pub(crate) remote_client: RemoteClient,
6269
}
6370

6471
#[allow(private_interfaces)]
@@ -76,6 +83,7 @@ impl RouteInfoManager {
7683
filter_server_table: HashMap::new(),
7784
topic_queue_mapping_info_table: HashMap::new(),
7885
namesrv_config,
86+
remote_client: RemoteClient::new(),
7987
}
8088
}
8189
}
@@ -106,26 +114,26 @@ impl RouteInfoManager {
106114
.unwrap()
107115
.insert(broker_name.clone());
108116

109-
let is_old_version_broker = if let Some(value) = enable_acting_master {
117+
let enable_acting_master_inner = if let Some(value) = enable_acting_master {
110118
value
111119
} else {
112120
false
113121
};
114122
let mut register_first =
115123
if let Some(broker_data) = self.broker_addr_table.get_mut(&broker_name) {
116-
broker_data.set_enable_acting_master(is_old_version_broker);
124+
broker_data.set_enable_acting_master(enable_acting_master_inner);
117125
broker_data.set_zone_name(zone_name.clone());
118126
false
119127
} else {
120-
self.broker_addr_table.insert(
128+
let mut broker_data = BrokerData::new(
129+
cluster_name.clone(),
121130
broker_name.clone(),
122-
BrokerData::new(
123-
cluster_name.clone(),
124-
broker_name.clone(),
125-
HashMap::new(),
126-
zone_name,
127-
),
131+
HashMap::new(),
132+
zone_name,
128133
);
134+
broker_data.set_enable_acting_master(enable_acting_master_inner);
135+
self.broker_addr_table
136+
.insert(broker_name.clone(), broker_data);
129137
true
130138
};
131139
let broker_data = self.broker_addr_table.get_mut(&broker_name).unwrap();
@@ -142,18 +150,31 @@ impl RouteInfoManager {
142150
// IP:PORT> The same IP:PORT must only have one record in brokerAddrTable
143151
broker_data.remove_broker_by_addr(broker_id, &broker_addr);
144152

153+
//If Local brokerId stateVersion bigger than the registering one
145154
if let Some(old_broker_addr) = broker_data.broker_addrs().get(&broker_id) {
146155
if old_broker_addr != &broker_addr {
147-
let addr_info =
156+
let addr_info_old =
148157
BrokerAddrInfo::new(cluster_name.clone(), old_broker_addr.to_string());
149-
if let Some(val) = self.broker_live_table.get(&addr_info) {
158+
if let Some(val) = self.broker_live_table.get(&addr_info_old) {
150159
let old_state_version = val.data_version().state_version();
151160
let new_state_version = topic_config_serialize_wrapper
152161
.data_version()
153162
.as_ref()
154163
.unwrap()
155164
.state_version();
156165
if old_state_version > new_state_version {
166+
warn!(
167+
"Registered Broker conflicts with the existed one, just ignore.: \
168+
Cluster:{}, BrokerName:{}, BrokerId:{},Old BrokerAddr:{}, Old \
169+
Version:{}, New BrokerAddr:{}, New Version:{}.",
170+
&cluster_name,
171+
&broker_name,
172+
broker_id,
173+
old_broker_addr,
174+
old_state_version,
175+
&broker_addr,
176+
new_state_version
177+
);
157178
self.broker_live_table.remove(
158179
BrokerAddrInfo::new(cluster_name.clone(), broker_addr.clone()).as_ref(),
159180
);
@@ -184,14 +205,18 @@ impl RouteInfoManager {
184205
register_first |= old_addr.is_none();
185206
let is_master = mix_all::MASTER_ID == broker_id as u64;
186207

187-
let is_prime_slave = !is_old_version_broker
208+
let is_prime_slave = enable_acting_master.is_some()
188209
&& !is_master
189210
&& broker_id == broker_data.broker_addrs().keys().min().copied().unwrap();
190211
let broker_data = broker_data.clone();
212+
//handle master or prime slave topic config update
191213
if is_master || is_prime_slave {
192214
if let Some(tc_table) = topic_config_serialize_wrapper.topic_config_table() {
193215
let topic_queue_mapping_info_map =
194216
topic_config_serialize_wrapper.topic_queue_mapping_info_map();
217+
218+
// Delete the topics that don't exist in tcTable from the current broker
219+
// Static topic is not supported currently
195220
if self.namesrv_config.delete_topic_with_broker_registration
196221
&& topic_queue_mapping_info_map.is_empty()
197222
{
@@ -284,7 +309,7 @@ impl RouteInfoManager {
284309
self.filter_server_table.remove(&broker_addr_info);
285310
} else {
286311
self.filter_server_table
287-
.insert(broker_addr_info, filter_server_list);
312+
.insert(broker_addr_info.clone(), filter_server_list);
288313
}
289314

290315
if mix_all::MASTER_ID != broker_id as u64 {
@@ -300,7 +325,17 @@ impl RouteInfoManager {
300325
}
301326
}
302327
if is_min_broker_id_changed && self.namesrv_config.notify_min_broker_id_changed {
303-
todo!()
328+
self.notify_min_broker_id_changed(
329+
broker_data.broker_addrs(),
330+
None,
331+
Some(
332+
self.broker_live_table
333+
.get(&broker_addr_info)
334+
.unwrap()
335+
.ha_server_addr()
336+
.to_string(),
337+
),
338+
)
304339
}
305340
Some(result)
306341
}
@@ -526,4 +561,56 @@ impl RouteInfoManager {
526561
.insert(topic_config.topic_name.clone(), queue_data_map_inner);
527562
}
528563
}
564+
565+
fn notify_min_broker_id_changed(
566+
&mut self,
567+
broker_addr_map: &HashMap<i64, String>,
568+
offline_broker_addr: Option<String>,
569+
ha_broker_addr: Option<String>,
570+
) {
571+
if broker_addr_map.is_empty() {
572+
return;
573+
}
574+
let min_broker_id = broker_addr_map.keys().min().copied().unwrap();
575+
// notify master
576+
let request_header = NotifyMinBrokerIdChangeRequestHeader::new(
577+
Some(min_broker_id),
578+
None,
579+
broker_addr_map.get(&min_broker_id).cloned(),
580+
offline_broker_addr.clone(),
581+
ha_broker_addr,
582+
);
583+
584+
if let Some(broker_addrs_notify) =
585+
self.choose_broker_addrs_to_notify(broker_addr_map, offline_broker_addr)
586+
{
587+
for broker_addr in broker_addrs_notify {
588+
let _ = self.remote_client.invoke_oneway(
589+
broker_addr,
590+
RemotingCommand::create_request_command(
591+
RequestCode::NotifyMinBrokerIdChange,
592+
request_header.clone(),
593+
),
594+
Duration::from_millis(3000),
595+
);
596+
}
597+
}
598+
}
599+
600+
fn choose_broker_addrs_to_notify(
601+
&mut self,
602+
broker_addr_map: &HashMap<i64, String>,
603+
offline_broker_addr: Option<String>,
604+
) -> Option<Vec<String>> {
605+
if broker_addr_map.len() == 1 || offline_broker_addr.is_some() {
606+
return Some(broker_addr_map.values().cloned().collect());
607+
}
608+
let min_broker_id = broker_addr_map.keys().min().copied().unwrap();
609+
let broker_addr_vec = broker_addr_map
610+
.iter()
611+
.filter(|(key, _value)| **key != min_broker_id)
612+
.map(|(_, value)| value.clone())
613+
.collect();
614+
Some(broker_addr_vec)
615+
}
529616
}

rocketmq-remoting/src/clients.rs

+53
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,58 @@
1515
* limitations under the License.
1616
*/
1717

18+
use std::{
19+
collections::HashMap,
20+
fmt::{Debug, Formatter},
21+
time::Duration,
22+
};
23+
24+
pub use blocking_client::BlockingClient;
25+
pub use client::Client;
26+
27+
use crate::protocol::remoting_command::RemotingCommand;
28+
1829
mod async_client;
30+
mod blocking_client;
31+
1932
mod client;
33+
34+
#[derive(Default)]
35+
pub struct RemoteClient {
36+
inner: HashMap<String, BlockingClient>,
37+
}
38+
39+
impl Clone for RemoteClient {
40+
fn clone(&self) -> Self {
41+
Self {
42+
inner: HashMap::new(),
43+
}
44+
}
45+
}
46+
47+
impl Debug for RemoteClient {
48+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49+
f.write_str("RemoteClient")
50+
}
51+
}
52+
53+
impl RemoteClient {
54+
/// Create a new `RemoteClient` instance.
55+
pub fn new() -> Self {
56+
Self {
57+
inner: HashMap::new(),
58+
}
59+
}
60+
61+
pub fn invoke_oneway(
62+
&mut self,
63+
addr: String,
64+
request: RemotingCommand,
65+
timeout: Duration,
66+
) -> anyhow::Result<()> {
67+
self.inner
68+
.entry(addr.clone())
69+
.or_insert_with(|| BlockingClient::connect(addr).unwrap())
70+
.invoke_oneway(request, timeout)
71+
}
72+
}

rocketmq-remoting/src/clients/async_client.rs

+9
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,12 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
18+
/// Connection with broker or nameserver
19+
pub struct AsyncClient {
20+
/// The asynchronous `Client`.
21+
inner: crate::clients::Client,
22+
23+
///Create a separate thread pool to handle clients.
24+
rt: tokio::runtime::Runtime,
25+
}

0 commit comments

Comments
 (0)