Skip to content

Commit be2ab3f

Browse files
authored
[ISSUE #433]👷Improving Methods of TopicQueueMappingManager Struct (#434)
* Improving Methods of TopicQueueMappingManager Struct * fix ci error * fix ci error * fix ci error
1 parent 65317cf commit be2ab3f

File tree

10 files changed

+308
-71
lines changed

10 files changed

+308
-71
lines changed

rocketmq-broker/src/mqtrace/send_message_context.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ impl SendMessageContext {
7474
pub fn broker_addr(&mut self, broker_addr: String) {
7575
self.broker_addr = broker_addr;
7676
}
77-
pub fn queue_id(&mut self, queue_id: i32) {
78-
self.queue_id = Some(queue_id);
77+
pub fn queue_id(&mut self, queue_id: Option<i32>) {
78+
self.queue_id = queue_id;
7979
}
8080
pub fn queue_offset(&mut self, queue_offset: i64) {
8181
self.queue_offset = Some(queue_offset);

rocketmq-broker/src/processor/send_message_processor.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,10 @@ impl<MS: MessageStore + Send> SendMessageProcessor<MS> {
101101
let mapping_context = self
102102
.topic_queue_mapping_manager
103103
.build_topic_queue_mapping_context(&request_header, true);
104-
let rewrite_result = self
105-
.topic_queue_mapping_manager
106-
.rewrite_request_for_static_topic(&request_header, &mapping_context);
104+
let rewrite_result = TopicQueueMappingManager::rewrite_request_for_static_topic(
105+
&mut request_header,
106+
&mapping_context,
107+
);
107108
if let Some(rewrite_result) = rewrite_result {
108109
return Some(rewrite_result);
109110
}
@@ -222,13 +223,13 @@ impl<MS: MessageStore + Send + Clone> SendMessageProcessor<MS> {
222223
.select_topic_config(request_header.topic().as_str())
223224
.unwrap();
224225
let mut queue_id = request_header.queue_id;
225-
if queue_id < 0 {
226-
queue_id = self.inner.random_queue_id(topic_config.write_queue_nums) as i32;
226+
if queue_id.is_none() || queue_id.unwrap() < 0 {
227+
queue_id = Some(self.inner.random_queue_id(topic_config.write_queue_nums) as i32);
227228
}
228229

229230
let mut message_ext = MessageExtBrokerInner::default();
230231
message_ext.message_ext_inner.message.topic = request_header.topic().to_string();
231-
message_ext.message_ext_inner.queue_id = queue_id;
232+
message_ext.message_ext_inner.queue_id = *queue_id.as_ref().unwrap();
232233
let mut ori_props =
233234
MessageDecoder::string_to_message_properties(request_header.properties.as_ref());
234235
if self.handle_retry_and_dlq(
@@ -321,7 +322,7 @@ impl<MS: MessageStore + Send + Clone> SendMessageProcessor<MS> {
321322
response,
322323
&request,
323324
topic.as_str(),
324-
queue_id,
325+
*queue_id.as_ref().unwrap(),
325326
)
326327
}
327328

rocketmq-broker/src/topic/manager/topic_queue_mapping_manager.rs

+128-39
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,20 @@
1818
use std::{collections::HashMap, sync::Arc};
1919

2020
use rocketmq_common::common::{broker::broker_config::BrokerConfig, config_manager::ConfigManager};
21-
use rocketmq_remoting::protocol::{
22-
body::topic_info_wrapper::topic_queue_wrapper::TopicQueueMappingSerializeWrapper,
23-
header::message_operation_header::TopicRequestHeaderTrait,
24-
remoting_command::RemotingCommand,
25-
static_topic::{
26-
topic_queue_mapping_context::TopicQueueMappingContext,
27-
topic_queue_mapping_detail::TopicQueueMappingDetail,
21+
use rocketmq_remoting::{
22+
code::response_code::ResponseCode,
23+
protocol::{
24+
body::topic_info_wrapper::topic_queue_wrapper::TopicQueueMappingSerializeWrapper,
25+
header::message_operation_header::TopicRequestHeaderTrait,
26+
remoting_command::RemotingCommand,
27+
static_topic::{
28+
topic_queue_mapping_context::TopicQueueMappingContext,
29+
topic_queue_mapping_detail::TopicQueueMappingDetail,
30+
},
31+
DataVersion, RemotingSerializable,
2832
},
29-
DataVersion,
3033
};
34+
use tracing::{info, warn};
3135

3236
use crate::broker_path_config_helper::get_topic_queue_mapping_path;
3337

@@ -47,6 +51,35 @@ impl TopicQueueMappingManager {
4751
}
4852
}
4953

54+
pub(crate) fn rewrite_request_for_static_topic(
55+
request_header: &mut impl TopicRequestHeaderTrait,
56+
mapping_context: &TopicQueueMappingContext,
57+
) -> Option<RemotingCommand> {
58+
mapping_context.mapping_detail.as_ref()?;
59+
60+
if !mapping_context.is_leader() {
61+
let mapping_detail = mapping_context.mapping_detail.as_ref().unwrap();
62+
return Some(RemotingCommand::create_response_command_with_code_remark(
63+
ResponseCode::NotLeaderForQueue,
64+
format!(
65+
"{}-{:?} does not exit in request process of current broker {}",
66+
request_header.topic(),
67+
request_header.queue_id(),
68+
mapping_detail
69+
.topic_queue_mapping_info
70+
.bname
71+
.clone()
72+
.unwrap_or_default()
73+
),
74+
));
75+
}
76+
let mapping_item = mapping_context.leader_item.as_ref().unwrap();
77+
request_header.set_queue_id(Some(mapping_item.queue_id));
78+
None
79+
}
80+
}
81+
82+
impl TopicQueueMappingManager {
5083
pub(crate) fn build_topic_queue_mapping_context(
5184
&self,
5285
request_header: &impl TopicRequestHeaderTrait,
@@ -66,13 +99,8 @@ impl TopicQueueMappingManager {
6699
}
67100
let topic = request_header.topic();
68101

69-
let mut global_id: Option<i32> = None;
70-
/*if let Some(header) = request_header
71-
.as_any()
72-
.downcast_ref::<TopicQueueRequestHeaderT>()
73-
{
74-
global_id = header.queue_id;
75-
}*/
102+
let mut global_id: Option<i32> = request_header.queue_id();
103+
76104
if let Some(mapping_detail) = self.topic_queue_mapping_table.lock().get(&topic) {
77105
// it is not static topic
78106
if mapping_detail
@@ -164,41 +192,45 @@ impl TopicQueueMappingManager {
164192
}
165193
}
166194

167-
pub(crate) fn rewrite_request_for_static_topic(
168-
&self,
169-
_request_header: &impl TopicRequestHeaderTrait,
170-
_mapping_context: &TopicQueueMappingContext,
171-
) -> Option<RemotingCommand> {
172-
//TODO
173-
None
174-
}
175-
176195
pub fn get_topic_queue_mapping(&self, topic: &str) -> Option<TopicQueueMappingDetail> {
177196
self.topic_queue_mapping_table.lock().get(topic).cloned()
178197
}
198+
199+
pub fn delete(&self, topic: &str) {
200+
let old = self.topic_queue_mapping_table.lock().remove(topic);
201+
match old {
202+
None => {
203+
warn!(
204+
"delete topic queue mapping failed, static topic: {} not exists",
205+
topic
206+
)
207+
}
208+
Some(value) => {
209+
info!(
210+
"delete topic queue mapping OK, static topic queue mapping: {:?}",
211+
value
212+
);
213+
self.data_version.lock().next_version();
214+
self.persist();
215+
}
216+
}
217+
}
179218
}
180219

181220
//Fully implemented will be removed
182-
#[allow(unused_variables)]
183221
impl ConfigManager for TopicQueueMappingManager {
184-
fn decode0(&mut self, key: &[u8], body: &[u8]) {
185-
todo!()
186-
}
187-
188-
fn stop(&mut self) -> bool {
189-
todo!()
190-
}
191-
192222
fn config_file_path(&self) -> String {
193223
get_topic_queue_mapping_path(self.broker_config.store_path_root_dir.as_str())
194224
}
195-
196-
fn encode(&mut self) -> String {
197-
todo!()
198-
}
199-
200225
fn encode_pretty(&self, pretty_format: bool) -> String {
201-
todo!()
226+
let wrapper = TopicQueueMappingSerializeWrapper::new(
227+
Some(self.topic_queue_mapping_table.lock().clone()),
228+
Some(self.data_version.lock().clone()),
229+
);
230+
match pretty_format {
231+
true => wrapper.to_json_pretty(),
232+
false => wrapper.to_json(),
233+
}
202234
}
203235

204236
fn decode(&self, json_string: &str) {
@@ -219,3 +251,60 @@ impl ConfigManager for TopicQueueMappingManager {
219251
}
220252
}
221253
}
254+
255+
#[cfg(test)]
256+
mod tests {
257+
use std::sync::Arc;
258+
259+
use rocketmq_common::common::broker::broker_config::BrokerConfig;
260+
261+
use super::*;
262+
263+
#[test]
264+
fn new_creates_default_manager() {
265+
let broker_config = Arc::new(BrokerConfig::default());
266+
let manager = TopicQueueMappingManager::new(broker_config.clone());
267+
268+
assert_eq!(Arc::ptr_eq(&manager.broker_config, &broker_config), true);
269+
assert_eq!(manager.data_version.lock().get_state_version(), 0);
270+
assert_eq!(manager.topic_queue_mapping_table.lock().len(), 0);
271+
}
272+
273+
#[test]
274+
fn get_topic_queue_mapping_returns_none_for_non_existent_topic() {
275+
let broker_config = Arc::new(BrokerConfig::default());
276+
let manager = TopicQueueMappingManager::new(broker_config);
277+
278+
assert!(manager
279+
.get_topic_queue_mapping("non_existent_topic")
280+
.is_none());
281+
}
282+
283+
#[test]
284+
fn get_topic_queue_mapping_returns_mapping_for_existing_topic() {
285+
let broker_config = Arc::new(BrokerConfig::default());
286+
let manager = TopicQueueMappingManager::new(broker_config);
287+
let detail = TopicQueueMappingDetail::default();
288+
manager
289+
.topic_queue_mapping_table
290+
.lock()
291+
.insert("existing_topic".to_string(), detail.clone());
292+
293+
assert!(manager.get_topic_queue_mapping("existing_topic").is_some());
294+
}
295+
296+
#[test]
297+
fn delete_removes_existing_topic() {
298+
let broker_config = Arc::new(BrokerConfig::default());
299+
let manager = TopicQueueMappingManager::new(broker_config);
300+
let detail = TopicQueueMappingDetail::default();
301+
manager
302+
.topic_queue_mapping_table
303+
.lock()
304+
.insert("existing_topic".to_string(), detail.clone());
305+
306+
manager.delete("existing_topic");
307+
308+
assert!(manager.get_topic_queue_mapping("existing_topic").is_none());
309+
}
310+
}

rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_queue_wrapper.rs

+12
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@ pub struct TopicQueueMappingSerializeWrapper {
3131
data_version: Option<DataVersion>,
3232
}
3333

34+
impl TopicQueueMappingSerializeWrapper {
35+
pub fn new(
36+
topic_queue_mapping_info_map: Option<HashMap<String, TopicQueueMappingDetail>>,
37+
data_version: Option<DataVersion>,
38+
) -> Self {
39+
Self {
40+
topic_queue_mapping_info_map,
41+
data_version,
42+
}
43+
}
44+
}
45+
3446
impl TopicQueueMappingSerializeWrapper {
3547
pub fn topic_queue_mapping_info_map(
3648
&self,

rocketmq-remoting/src/protocol/header/message_operation_header.rs

+4
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,8 @@ pub trait TopicRequestHeaderTrait {
4141
fn oneway(&self) -> Option<bool>;
4242

4343
fn with_oneway(&mut self, oneway: bool);
44+
45+
fn queue_id(&self) -> Option<i32>;
46+
47+
fn set_queue_id(&mut self, queue_id: Option<i32>);
4448
}

0 commit comments

Comments
 (0)