Skip to content

Commit afa26af

Browse files
committed
[ISSUE #82]Namesrv support delete topic in namesrv (request code 216)
1 parent 820c0e2 commit afa26af

File tree

4 files changed

+102
-1
lines changed

4 files changed

+102
-1
lines changed

rocketmq-namesrv/src/processor/default_request_processor.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use rocketmq_remoting::{
4949
QueryDataVersionRequestHeader, QueryDataVersionResponseHeader,
5050
},
5151
register_broker_header::{RegisterBrokerRequestHeader, RegisterBrokerResponseHeader},
52+
topic_operation_header::DeleteTopicFromNamesrvRequestHeader,
5253
},
5354
remoting_command::RemotingCommand,
5455
DataVersion, RemotingSerializable,
@@ -86,7 +87,7 @@ impl RequestProcessor for DefaultRequestProcessor {
8687
Some(RequestCode::GetAllTopicListFromNameserver) => {
8788
self.get_all_topic_list_from_nameserver(request)
8889
}
89-
90+
Some(RequestCode::DeleteTopicInNamesrv) => self.delete_topic_in_name_srv(request),
9091
_ => RemotingCommand::create_response_command_with_code(
9192
RemotingSysResponseCode::SystemError,
9293
),
@@ -363,6 +364,17 @@ impl DefaultRequestProcessor {
363364
RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::SystemError)
364365
.set_remark(Some(String::from("disable")))
365366
}
367+
368+
fn delete_topic_in_name_srv(&mut self, request: RemotingCommand) -> RemotingCommand {
369+
let request_header = request
370+
.decode_command_custom_header::<DeleteTopicFromNamesrvRequestHeader>()
371+
.unwrap();
372+
self.route_info_manager.write().delete_topic(
373+
request_header.topic.as_str(),
374+
request_header.cluster_name.clone(),
375+
);
376+
RemotingCommand::create_response_command()
377+
}
366378
}
367379

368380
fn extract_register_topic_config_from_request(

rocketmq-namesrv/src/route/route_info_manager.rs

+27
Original file line numberDiff line numberDiff line change
@@ -689,4 +689,31 @@ impl RouteInfoManager {
689689
broker_addr: None,
690690
}
691691
}
692+
693+
pub(crate) fn delete_topic(
694+
&mut self,
695+
topic: impl Into<String>,
696+
cluster_name: Option<impl Into<String>>,
697+
) {
698+
let topic_inner = topic.into();
699+
if cluster_name.is_some() {
700+
let cluster_name_inner = cluster_name.map(|s| s.into()).unwrap();
701+
let broker_names = self.cluster_addr_table.get(cluster_name_inner.as_str());
702+
if broker_names.is_none() || broker_names.unwrap().is_empty() {
703+
return;
704+
}
705+
if let Some(queue_data_map) = self.topic_queue_table.get_mut(topic_inner.as_str()) {
706+
for broker_name in broker_names.unwrap() {
707+
if let Some(remove_qd) = queue_data_map.remove(broker_name) {
708+
info!(
709+
"deleteTopic, remove one broker's topic {} {} {:?}",
710+
broker_name, &topic_inner, remove_qd
711+
)
712+
}
713+
}
714+
}
715+
} else {
716+
self.topic_queue_table.remove(topic_inner.as_str());
717+
}
718+
}
692719
}

rocketmq-remoting/src/protocol/header/namesrv.rs

+1
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ pub mod kv_config_header;
2121
pub mod perm_broker_header;
2222
pub mod query_data_version_header;
2323
pub mod register_broker_header;
24+
pub mod topic_operation_header;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::collections::HashMap;
19+
20+
use serde::{Deserialize, Serialize};
21+
22+
use crate::protocol::command_custom_header::{CommandCustomHeader, FromMap};
23+
24+
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
25+
#[serde(rename_all = "camelCase")]
26+
pub struct DeleteTopicFromNamesrvRequestHeader {
27+
pub topic: String,
28+
pub cluster_name: Option<String>,
29+
}
30+
31+
impl DeleteTopicFromNamesrvRequestHeader {
32+
const TOPIC: &'static str = "topic";
33+
const CLUSTER_NAME: &'static str = "clusterName";
34+
pub fn new(topic: impl Into<String>, cluster_name: Option<impl Into<String>>) -> Self {
35+
Self {
36+
topic: topic.into(),
37+
cluster_name: cluster_name.map(|s| s.into()),
38+
}
39+
}
40+
}
41+
42+
impl CommandCustomHeader for DeleteTopicFromNamesrvRequestHeader {
43+
fn to_map(&self) -> Option<HashMap<String, String>> {
44+
let mut map = HashMap::from([(Self::TOPIC.to_string(), self.topic.clone())]);
45+
if let Some(ref cluster_name) = self.cluster_name {
46+
map.insert(Self::CLUSTER_NAME.to_string(), cluster_name.clone());
47+
}
48+
Some(map)
49+
}
50+
}
51+
52+
impl FromMap for DeleteTopicFromNamesrvRequestHeader {
53+
type Target = Self;
54+
55+
fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
56+
Some(DeleteTopicFromNamesrvRequestHeader {
57+
topic: map.get(Self::TOPIC).cloned().unwrap_or_default(),
58+
cluster_name: map.get(Self::CLUSTER_NAME).map(|s| s.into()),
59+
})
60+
}
61+
}

0 commit comments

Comments
 (0)