Skip to content

Commit 13eae18

Browse files
committed
[ISSUE #1763]🚀Implement ConsumerOrderInfoManager#auto_clean method function🔥
1 parent b419985 commit 13eae18

File tree

5 files changed

+169
-16
lines changed

5 files changed

+169
-16
lines changed

‎rocketmq-broker/src/broker_runtime.rs

+11-7
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pub(crate) struct BrokerRuntime {
103103
#[cfg(feature = "local_file_store")]
104104
subscription_group_manager: Arc<SubscriptionGroupManager<DefaultMessageStore>>,
105105
consumer_filter_manager: Arc<ConsumerFilterManager>,
106-
consumer_order_info_manager: Arc<ConsumerOrderInfoManager>,
106+
consumer_order_info_manager: Arc<ConsumerOrderInfoManager<DefaultMessageStore>>,
107107
#[cfg(feature = "local_file_store")]
108108
message_store: Option<ArcMut<DefaultMessageStore>>,
109109
#[cfg(feature = "local_file_store")]
@@ -154,7 +154,7 @@ impl Clone for BrokerRuntime {
154154
consumer_offset_manager: self.consumer_offset_manager.clone(),
155155
subscription_group_manager: self.subscription_group_manager.clone(),
156156
consumer_filter_manager: Arc::new(Default::default()),
157-
consumer_order_info_manager: Arc::new(Default::default()),
157+
consumer_order_info_manager: self.consumer_order_info_manager.clone(),
158158
message_store: self.message_store.clone(),
159159
broker_stats: self.broker_stats.clone(),
160160
schedule_message_service: self.schedule_message_service.clone(),
@@ -241,19 +241,23 @@ impl BrokerRuntime {
241241
topic_route_info_manager.clone(),
242242
broker_outer_api.clone(),
243243
));
244+
let subscription_group_manager =
245+
Arc::new(SubscriptionGroupManager::new(broker_config.clone(), None));
246+
let consumer_order_info_manager = Arc::new(ConsumerOrderInfoManager::new(
247+
broker_config.clone(),
248+
Arc::new(topic_config_manager.clone()),
249+
subscription_group_manager.clone(),
250+
));
244251
Self {
245252
broker_config: broker_config.clone(),
246253
message_store_config,
247254
server_config,
248255
topic_config_manager,
249256
topic_queue_mapping_manager,
250257
consumer_offset_manager: ConsumerOffsetManager::new(broker_config.clone(), None),
251-
subscription_group_manager: Arc::new(SubscriptionGroupManager::new(
252-
broker_config.clone(),
253-
None,
254-
)),
258+
subscription_group_manager,
255259
consumer_filter_manager: Arc::new(Default::default()),
256-
consumer_order_info_manager: Arc::new(Default::default()),
260+
consumer_order_info_manager,
257261
message_store: None,
258262
broker_stats: None,
259263
schedule_message_service: Default::default(),

‎rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

+131-5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::collections::HashMap;
1919
use std::collections::HashSet;
20+
use std::fmt::Display;
2021
use std::ops::Deref;
2122
use std::sync::Arc;
2223

@@ -27,24 +28,46 @@ use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
2728
use rocketmq_common::TimeUtils::get_current_millis;
2829
use serde::Deserialize;
2930
use serde::Serialize;
31+
use tracing::info;
3032
use tracing::warn;
3133

3234
use crate::broker_path_config_helper::get_consumer_order_info_path;
3335
use crate::offset::manager::consumer_order_info_lock_manager::ConsumerOrderInfoLockManager;
36+
use crate::subscription::manager::subscription_group_manager::SubscriptionGroupManager;
37+
use crate::topic::manager::topic_config_manager::TopicConfigManager;
3438

3539
const TOPIC_GROUP_SEPARATOR: &str = "@";
3640
const CLEAN_SPAN_FROM_LAST: u64 = 24 * 3600 * 1000;
3741

38-
#[derive(Default)]
39-
pub(crate) struct ConsumerOrderInfoManager {
42+
pub(crate) struct ConsumerOrderInfoManager<MS> {
4043
pub(crate) broker_config: Arc<BrokerConfig>,
4144
pub(crate) consumer_order_info_wrapper: parking_lot::Mutex<ConsumerOrderInfoWrapper>,
4245
pub(crate) consumer_order_info_lock_manager: Option<ConsumerOrderInfoLockManager>,
46+
pub(crate) topic_config_manager: Arc<TopicConfigManager>,
47+
pub(crate) subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,
48+
}
49+
50+
impl<MS> ConsumerOrderInfoManager<MS> {
51+
pub fn new(
52+
broker_config: Arc<BrokerConfig>,
53+
topic_config_manager: Arc<TopicConfigManager>,
54+
subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,
55+
) -> ConsumerOrderInfoManager<MS> {
56+
Self {
57+
broker_config,
58+
consumer_order_info_wrapper: parking_lot::Mutex::new(
59+
ConsumerOrderInfoWrapper::default(),
60+
),
61+
consumer_order_info_lock_manager: None,
62+
topic_config_manager,
63+
subscription_group_manager,
64+
}
65+
}
4366
}
4467

4568
//Fully implemented will be removed
4669
#[allow(unused_variables)]
47-
impl ConfigManager for ConsumerOrderInfoManager {
70+
impl<MS> ConfigManager for ConsumerOrderInfoManager<MS> {
4871
fn config_file_path(&self) -> String {
4972
get_consumer_order_info_path(self.broker_config.store_path_root_dir.as_str())
5073
}
@@ -81,8 +104,92 @@ impl ConfigManager for ConsumerOrderInfoManager {
81104
}
82105
}
83106

84-
impl ConsumerOrderInfoManager {
85-
fn auto_clean(&self) {}
107+
impl<MS> ConsumerOrderInfoManager<MS> {
108+
pub fn auto_clean(&self) {
109+
let mut consumer_order_info_wrapper = self.consumer_order_info_wrapper.lock();
110+
let table = &mut consumer_order_info_wrapper.table;
111+
112+
// Iterate through the `table` entries (topic@group -> queueId -> OrderInfo)
113+
let mut keys_to_remove = Vec::new(); // Temporary storage for keys to remove
114+
115+
for (topic_at_group, qs) in table.iter_mut() {
116+
let arrays: Vec<&str> = topic_at_group.split('@').collect();
117+
if arrays.len() != 2 {
118+
continue;
119+
}
120+
let topic = arrays[0];
121+
let group = arrays[1];
122+
123+
let topic_config = self
124+
.topic_config_manager
125+
.select_topic_config(&CheetahString::from(topic));
126+
if topic_config.is_none() {
127+
info!(
128+
"Topic not exist, Clean order info, {}:{:?}",
129+
topic_at_group, qs
130+
);
131+
keys_to_remove.push(topic_at_group.clone());
132+
continue;
133+
}
134+
let subscription_group_wrapper = self
135+
.subscription_group_manager
136+
.subscription_group_wrapper()
137+
.lock();
138+
let subscription_group_config = subscription_group_wrapper
139+
.subscription_group_table()
140+
.get(&CheetahString::from(group));
141+
if subscription_group_config.is_none() {
142+
info!(
143+
"Group not exist, Clean order info, {}:{:?}",
144+
topic_at_group, qs
145+
);
146+
keys_to_remove.push(topic_at_group.clone());
147+
continue;
148+
}
149+
150+
if qs.is_empty() {
151+
info!(
152+
"Order table is empty, Clean order info, {}:{:?}",
153+
topic_at_group, qs
154+
);
155+
keys_to_remove.push(topic_at_group.clone());
156+
continue;
157+
}
158+
let topic_config = topic_config.unwrap();
159+
// Clean individual queues in the current topic@group
160+
let mut queues_to_remove = Vec::new();
161+
for (queue_id, order_info) in qs.iter_mut() {
162+
if *queue_id == topic_config.read_queue_nums as i32 {
163+
queues_to_remove.push(*queue_id);
164+
info!(
165+
"Queue not exist, Clean order info, {}:{}, {}",
166+
topic_at_group, order_info, topic_config
167+
);
168+
continue;
169+
}
170+
}
171+
172+
// Remove stale or invalid queues
173+
for queue_id in queues_to_remove {
174+
qs.remove(&queue_id);
175+
info!(
176+
"Removed queue {} for topic@group {}",
177+
queue_id, topic_at_group
178+
);
179+
}
180+
181+
// If all queues are removed, mark topic@group for removal
182+
if qs.is_empty() {
183+
keys_to_remove.push(topic_at_group.clone());
184+
}
185+
}
186+
187+
// Now, remove all topics/groups from the table that need to be cleaned
188+
for key in keys_to_remove {
189+
table.remove(&key);
190+
info!("Removed topic@group {}", key);
191+
}
192+
}
86193

87194
pub fn update_next_visible_time(
88195
&self,
@@ -167,6 +274,25 @@ pub(crate) struct OrderInfo {
167274
attempt_id: String,
168275
}
169276

277+
impl Display for OrderInfo {
278+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279+
write!(
280+
f,
281+
"OrderInfo {{ popTime: {}, invisibleTime: {:?}, offsetList: {:?}, \
282+
offsetNextVisibleTime: {:?}, offsetConsumedCount: {:?}, lastConsumeTimestamp: {}, \
283+
commitOffsetBit: {}, attemptId: {} }}",
284+
self.pop_time,
285+
self.invisible_time,
286+
self.offset_list,
287+
self.offset_next_visible_time,
288+
self.offset_consumed_count,
289+
self.last_consume_timestamp,
290+
self.commit_offset_bit,
291+
self.attempt_id
292+
)
293+
}
294+
}
295+
170296
impl OrderInfo {
171297
/// Builds a list of offsets from a given list of queue offsets.
172298
/// If the list contains only one element, it returns the same list.

‎rocketmq-broker/src/processor/change_invisible_time_processor.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct ChangeInvisibleTimeProcessor<MS> {
5959
topic_config_manager: TopicConfigManager,
6060
message_store: ArcMut<MS>,
6161
consumer_offset_manager: Arc<ConsumerOffsetManager>,
62-
consumer_order_info_manager: Arc<ConsumerOrderInfoManager>,
62+
consumer_order_info_manager: Arc<ConsumerOrderInfoManager<MS>>,
6363
broker_stats_manager: Arc<BrokerStatsManager>,
6464
pop_buffer_merge_service: ArcMut<PopBufferMergeService>,
6565
escape_bridge: ArcMut<EscapeBridge<MS>>,
@@ -74,7 +74,7 @@ impl<MS> ChangeInvisibleTimeProcessor<MS> {
7474
topic_config_manager: TopicConfigManager,
7575
message_store: ArcMut<MS>,
7676
consumer_offset_manager: Arc<ConsumerOffsetManager>,
77-
consumer_order_info_manager: Arc<ConsumerOrderInfoManager>,
77+
consumer_order_info_manager: Arc<ConsumerOrderInfoManager<MS>>,
7878
broker_stats_manager: Arc<BrokerStatsManager>,
7979
pop_buffer_merge_service: ArcMut<PopBufferMergeService>,
8080
escape_bridge: ArcMut<EscapeBridge<MS>>,

‎rocketmq-broker/src/subscription/manager/subscription_group_manager.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub const TOPIC_MAX_LENGTH: usize = 127;
3838

3939
pub(crate) struct SubscriptionGroupManager<MS> {
4040
pub(crate) broker_config: Arc<BrokerConfig>,
41-
subscription_group_wrapper: Arc<parking_lot::Mutex<SubscriptionGroupWrapper>>,
41+
pub(crate) subscription_group_wrapper: Arc<parking_lot::Mutex<SubscriptionGroupWrapper>>,
4242
pub(crate) message_store: Option<MS>,
4343
}
4444

@@ -55,6 +55,10 @@ impl<MS> SubscriptionGroupManager<MS> {
5555
message_store,
5656
}
5757
}
58+
59+
pub fn subscription_group_wrapper(&self) -> &Arc<parking_lot::Mutex<SubscriptionGroupWrapper>> {
60+
&self.subscription_group_wrapper
61+
}
5862
}
5963

6064
impl<MS> ConfigManager for SubscriptionGroupManager<MS> {
@@ -199,7 +203,7 @@ where
199203

200204
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
201205
#[serde(rename_all = "camelCase")]
202-
struct SubscriptionGroupWrapper {
206+
pub(crate) struct SubscriptionGroupWrapper {
203207
subscription_group_table: HashMap<CheetahString, SubscriptionGroupConfig>,
204208
forbidden_table: HashMap<CheetahString, HashMap<CheetahString, i32>>,
205209
data_version: DataVersion,

‎rocketmq-common/src/common/config.rs

+19
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
use std::collections::HashMap;
19+
use std::fmt::Display;
1920

2021
use cheetah_string::CheetahString;
2122
use serde::Deserialize;
@@ -39,6 +40,24 @@ pub struct TopicConfig {
3940
pub attributes: HashMap<CheetahString, CheetahString>,
4041
}
4142

43+
impl Display for TopicConfig {
44+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45+
write!(
46+
f,
47+
"TopicConfig {{ topic_name: {:?}, read_queue_nums: {}, write_queue_nums: {}, perm: \
48+
{}, topic_filter_type: {}, topic_sys_flag: {}, order: {}, attributes: {:?} }}",
49+
self.topic_name,
50+
self.read_queue_nums,
51+
self.write_queue_nums,
52+
self.perm,
53+
self.topic_filter_type,
54+
self.topic_sys_flag,
55+
self.order,
56+
self.attributes
57+
)
58+
}
59+
}
60+
4261
impl Default for TopicConfig {
4362
fn default() -> Self {
4463
Self {

0 commit comments

Comments
 (0)