Skip to content

Commit 6086ba6

Browse files
committed
[ISSUE #75] 🚀 Namesrv support add write perm of brober (request code 327)
1 parent f6eb09d commit 6086ba6

File tree

5 files changed

+94
-2
lines changed

5 files changed

+94
-2
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ Feature list:
9494
| Get broker member_group | 901 | :sparkling_heart: :white_check_mark: | |
9595
| Get broker cluster info | 106 | :sparkling_heart: :white_check_mark: | |
9696
| Wipe write perm of boker | 205 | :sparkling_heart: :white_check_mark: | |
97-
| Add write perm of brober | 327 | :broken_heart: :x: | |
97+
| Add write perm of brober | 327 | :sparkling_heart: :white_check_mark: | |
9898
| Get all topic list from name server | 206 | :broken_heart: :x: | |
9999
| Delete topic in name server | 216 | :broken_heart: :x: | |
100100
| Register topic in name server | 217 | :broken_heart: :x: | |

rocketmq-namesrv/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Feature list:
2727
| Get broker member_group | 901 | :sparkling_heart: :white_check_mark: | |
2828
| Get broker cluster info | 106 | :sparkling_heart: :white_check_mark: | |
2929
| Wipe write perm of boker | 205 | :sparkling_heart: :white_check_mark: | |
30-
| Add write perm of brober | 327 | :broken_heart: :x: | |
30+
| Add write perm of brober | 327 | :sparkling_heart: :white_check_mark: | |
3131
| Get all topic list from name server | 206 | :broken_heart: :x: | |
3232
| Delete topic in name server | 216 | :broken_heart: :x: | |
3333
| Register topic in name server | 217 | :broken_heart: :x: | |

rocketmq-namesrv/src/processor/default_request_processor.rs

+15
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use rocketmq_remoting::{
4242
PutKVConfigRequestHeader,
4343
},
4444
perm_broker_header::{
45+
AddWritePermOfBrokerRequestHeader, AddWritePermOfBrokerResponseHeader,
4546
WipeWritePermOfBrokerRequestHeader, WipeWritePermOfBrokerResponseHeader,
4647
},
4748
query_data_version_header::{
@@ -81,6 +82,7 @@ impl RequestProcessor for DefaultRequestProcessor {
8182
//handle get broker cluster info
8283
Some(RequestCode::GetBrokerClusterInfo) => self.get_broker_cluster_info(request),
8384
Some(RequestCode::WipeWritePermOfBroker) => self.wipe_write_perm_of_broker(request),
85+
Some(RequestCode::AddWritePermOfBroker) => self.add_write_perm_of_broker(request),
8486

8587
_ => RemotingCommand::create_response_command_with_code(
8688
RemotingSysResponseCode::SystemError,
@@ -333,6 +335,19 @@ impl DefaultRequestProcessor {
333335
WipeWritePermOfBrokerResponseHeader::new(wipe_topic_cnt),
334336
)))
335337
}
338+
339+
fn add_write_perm_of_broker(&mut self, request: RemotingCommand) -> RemotingCommand {
340+
let request_header = request
341+
.decode_command_custom_header::<AddWritePermOfBrokerRequestHeader>()
342+
.unwrap();
343+
let add_topic_cnt = self
344+
.route_info_manager
345+
.write()
346+
.add_write_perm_of_broker_by_lock(request_header.broker_name.as_str());
347+
RemotingCommand::create_response_command().set_command_custom_header(Some(Box::new(
348+
AddWritePermOfBrokerResponseHeader::new(add_topic_cnt),
349+
)))
350+
}
336351
}
337352

338353
fn extract_register_topic_config_from_request(

rocketmq-namesrv/src/route/route_info_manager.rs

+4
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,10 @@ impl RouteInfoManager {
648648
self.operate_write_perm_of_broker(broker_name, RequestCode::WipeWritePermOfBroker)
649649
}
650650

651+
pub(crate) fn add_write_perm_of_broker_by_lock(&mut self, broker_name: &str) -> i32 {
652+
self.operate_write_perm_of_broker(broker_name, RequestCode::AddWritePermOfBroker)
653+
}
654+
651655
fn operate_write_perm_of_broker(
652656
&mut self,
653657
broker_name: &str,

rocketmq-remoting/src/protocol/header/namesrv/perm_broker_header.rs

+73
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,76 @@ impl FromMap for WipeWritePermOfBrokerResponseHeader {
9292
})
9393
}
9494
}
95+
96+
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
97+
#[serde(rename_all = "camelCase")]
98+
pub struct AddWritePermOfBrokerRequestHeader {
99+
pub broker_name: String,
100+
}
101+
102+
impl AddWritePermOfBrokerRequestHeader {
103+
const BROKER_NAME: &'static str = "brokerName";
104+
pub fn new(broker_name: impl Into<String>) -> Self {
105+
Self {
106+
broker_name: broker_name.into(),
107+
}
108+
}
109+
}
110+
111+
impl CommandCustomHeader for AddWritePermOfBrokerRequestHeader {
112+
fn to_map(&self) -> Option<HashMap<String, String>> {
113+
Some(HashMap::from([(
114+
Self::BROKER_NAME.to_string(),
115+
self.broker_name.clone(),
116+
)]))
117+
}
118+
}
119+
120+
impl FromMap for AddWritePermOfBrokerRequestHeader {
121+
type Target = Self;
122+
123+
fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
124+
Some(AddWritePermOfBrokerRequestHeader {
125+
broker_name: map
126+
.get(AddWritePermOfBrokerRequestHeader::BROKER_NAME)
127+
.cloned()
128+
.unwrap_or_default(),
129+
})
130+
}
131+
}
132+
133+
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
134+
#[serde(rename_all = "camelCase")]
135+
pub struct AddWritePermOfBrokerResponseHeader {
136+
pub add_topic_count: i32,
137+
}
138+
139+
impl AddWritePermOfBrokerResponseHeader {
140+
const ADD_TOPIC_COUNT: &'static str = "addTopicCount";
141+
142+
pub fn new(add_topic_count: i32) -> Self {
143+
Self { add_topic_count }
144+
}
145+
}
146+
147+
impl CommandCustomHeader for AddWritePermOfBrokerResponseHeader {
148+
fn to_map(&self) -> Option<HashMap<String, String>> {
149+
Some(HashMap::from([(
150+
Self::ADD_TOPIC_COUNT.to_string(),
151+
self.add_topic_count.to_string(),
152+
)]))
153+
}
154+
}
155+
156+
impl FromMap for AddWritePermOfBrokerResponseHeader {
157+
type Target = Self;
158+
159+
fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
160+
Some(AddWritePermOfBrokerResponseHeader {
161+
add_topic_count: map
162+
.get(AddWritePermOfBrokerResponseHeader::ADD_TOPIC_COUNT)
163+
.and_then(|s| s.parse::<i32>().ok())
164+
.unwrap_or(0),
165+
})
166+
}
167+
}

0 commit comments

Comments
 (0)