Skip to content

Commit eb4ab3f

Browse files
authored
[ISSUE #457]🚀Enhance the functionality of BrokerStatsManager (#458)
1 parent d229846 commit eb4ab3f

File tree

8 files changed

+617
-0
lines changed

8 files changed

+617
-0
lines changed

rocketmq-common/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,6 @@ once_cell = { workspace = true }
5454
tempfile = "3.10.1"
5555
trait-variant.workspace = true
5656
time = "0.3.36"
57+
dashmap = "5.5.3"
5758
[dev-dependencies]
5859
mockall = "0.12.1"

rocketmq-common/src/common/stats.rs

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
pub mod call_snapshot;
1818
pub mod moment_stats_item;
19+
pub mod moment_stats_item_set;
20+
pub mod stats_item;
1921
pub mod stats_snapshot;
2022

2123
pub struct Stats;

rocketmq-common/src/common/stats/moment_stats_item.rs

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use tracing::info;
2727

2828
use crate::{TimeUtils::get_current_millis, UtilAll::compute_next_minutes_time_millis};
2929

30+
#[derive(Clone)]
3031
pub struct MomentStatsItem {
3132
value: Arc<AtomicI64>,
3233
stats_name: String,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::sync::{Arc, Mutex};
18+
19+
use dashmap::DashMap;
20+
use tokio::{
21+
task,
22+
time::{interval, Duration},
23+
};
24+
25+
use crate::{
26+
common::stats::moment_stats_item::MomentStatsItem, TimeUtils::get_current_millis,
27+
UtilAll::compute_next_minutes_time_millis,
28+
};
29+
30+
#[derive(Clone)]
31+
pub struct MomentStatsItemSet {
32+
stats_item_table: Arc<DashMap<String, MomentStatsItem>>,
33+
stats_name: String,
34+
scheduled_task: Arc<Mutex<Option<task::JoinHandle<()>>>>,
35+
}
36+
37+
impl MomentStatsItemSet {
38+
pub fn new(stats_name: String) -> Self {
39+
let stats_item_table = Arc::new(DashMap::new());
40+
let scheduled_task = Arc::new(Mutex::new(None));
41+
let set = MomentStatsItemSet {
42+
stats_item_table,
43+
stats_name,
44+
scheduled_task,
45+
};
46+
set.init();
47+
set
48+
}
49+
50+
pub fn get_stats_item_table(&self) -> Arc<DashMap<String, MomentStatsItem>> {
51+
Arc::clone(&self.stats_item_table)
52+
}
53+
54+
pub fn get_stats_name(&self) -> &str {
55+
&self.stats_name
56+
}
57+
58+
pub fn init(&self) {
59+
let stats_item_table = Arc::clone(&self.stats_item_table);
60+
let initial_delay = Duration::from_millis(
61+
(compute_next_minutes_time_millis() as i64 - get_current_millis() as i64)
62+
.unsigned_abs(),
63+
);
64+
65+
let mut interval = tokio::time::interval(Duration::from_secs(300));
66+
67+
task::spawn(async move {
68+
tokio::time::sleep(initial_delay).await;
69+
loop {
70+
interval.tick().await;
71+
MomentStatsItemSet::print_at_minutes(&stats_item_table);
72+
}
73+
});
74+
}
75+
76+
fn print_at_minutes(stats_item_table: &DashMap<String, MomentStatsItem>) {
77+
for entry in stats_item_table.iter() {
78+
entry.value().print_at_minutes();
79+
}
80+
}
81+
82+
pub fn set_value(&self, stats_key: &str, value: i32) {
83+
let stats_item = self.get_and_create_stats_item(stats_key.to_string());
84+
stats_item
85+
.get_value()
86+
.store(value as i64, std::sync::atomic::Ordering::Relaxed);
87+
}
88+
89+
pub fn del_value_by_infix_key(&self, stats_key: &str, separator: &str) {
90+
let to_remove: Vec<String> = self
91+
.stats_item_table
92+
.iter()
93+
.filter(|entry| {
94+
entry
95+
.key()
96+
.contains(&format!("{}{}{}", separator, stats_key, separator))
97+
})
98+
.map(|entry| entry.key().clone())
99+
.collect();
100+
for key in to_remove {
101+
self.stats_item_table.remove(&key);
102+
}
103+
}
104+
105+
pub fn del_value_by_suffix_key(&self, stats_key: &str, separator: &str) {
106+
let to_remove: Vec<String> = self
107+
.stats_item_table
108+
.iter()
109+
.filter(|entry| {
110+
entry
111+
.key()
112+
.ends_with(&format!("{}{}", separator, stats_key))
113+
})
114+
.map(|entry| entry.key().clone())
115+
.collect();
116+
for key in to_remove {
117+
self.stats_item_table.remove(&key);
118+
}
119+
}
120+
121+
pub fn get_and_create_stats_item(&self, stats_key: String) -> MomentStatsItem {
122+
if let Some(stats_item) = self.stats_item_table.get(&stats_key) {
123+
return stats_item.clone();
124+
}
125+
126+
let new_item = MomentStatsItem::new(self.stats_name.clone(), stats_key.clone());
127+
self.stats_item_table.insert(stats_key, new_item.clone());
128+
new_item
129+
}
130+
}
131+
132+
#[cfg(test)]
133+
mod tests {
134+
use std::sync::Arc;
135+
136+
use dashmap::DashMap;
137+
138+
use super::*;
139+
140+
#[tokio::test]
141+
async fn moment_stats_item_set_initializes_with_empty_table() {
142+
let stats_set = MomentStatsItemSet::new("TestName".to_string());
143+
assert!(stats_set.get_stats_item_table().is_empty());
144+
}
145+
146+
#[tokio::test]
147+
async fn moment_stats_item_set_returns_correct_stats_name() {
148+
let stats_set = MomentStatsItemSet::new("TestName".to_string());
149+
assert_eq!(stats_set.get_stats_name(), "TestName");
150+
}
151+
152+
#[tokio::test]
153+
async fn moment_stats_item_set_creates_and_returns_stats_item() {
154+
let stats_set = MomentStatsItemSet::new("TestName".to_string());
155+
let stats_item = stats_set.get_and_create_stats_item("TestKey".to_string());
156+
assert_eq!(stats_item.get_stats_name(), "TestName");
157+
assert_eq!(stats_item.get_stats_key(), "TestKey");
158+
}
159+
160+
#[tokio::test]
161+
async fn moment_stats_item_set_sets_and_gets_value() {
162+
let stats_set = MomentStatsItemSet::new("TestName".to_string());
163+
stats_set.set_value("TestKey", 10);
164+
let stats_item = stats_set.get_and_create_stats_item("TestKey".to_string());
165+
assert_eq!(
166+
stats_item
167+
.get_value()
168+
.load(std::sync::atomic::Ordering::Relaxed),
169+
10
170+
);
171+
}
172+
173+
#[tokio::test]
174+
async fn moment_stats_item_set_deletes_value_by_infix_key() {
175+
let stats_set = MomentStatsItemSet::new("TestName".to_string());
176+
stats_set.set_value("_TestKey_", 10);
177+
stats_set.del_value_by_infix_key("TestKey", "_");
178+
assert!(stats_set.get_stats_item_table().is_empty());
179+
}
180+
181+
#[tokio::test]
182+
async fn moment_stats_item_set_deletes_value_by_suffix_key() {
183+
let stats_set = MomentStatsItemSet::new("TestName".to_string());
184+
stats_set.set_value("_TestKey", 10);
185+
stats_set.del_value_by_suffix_key("TestKey", "_");
186+
assert!(stats_set.get_stats_item_table().is_empty());
187+
}
188+
}

0 commit comments

Comments
 (0)