Skip to content

Commit f1747dc

Browse files
[ISSUE #461]🎉Add StatisticsItem struct 🎉 (#462)
* [ISSUE #461]🎉Add StatisticsItem struct 🎉 * fix ci error
1 parent 84ff741 commit f1747dc

File tree

1 file changed

+269
-0
lines changed

1 file changed

+269
-0
lines changed

rocketmq-common/src/common/statistics/statistics_item.rs

+269
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,272 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use std::{
18+
sync::{
19+
atomic::{AtomicI64, AtomicU64, Ordering},
20+
Arc, Mutex,
21+
},
22+
time::{SystemTime, UNIX_EPOCH},
23+
};
24+
25+
use crate::common::statistics::interceptor::Interceptor;
26+
27+
pub struct StatisticsItem {
28+
stat_kind: String,
29+
stat_object: String,
30+
item_names: Vec<String>,
31+
item_accumulates: Vec<AtomicI64>,
32+
invoke_times: AtomicI64,
33+
last_timestamp: AtomicU64,
34+
interceptor: Option<Arc<dyn Interceptor + Send + Sync>>,
35+
}
36+
37+
impl StatisticsItem {
38+
pub fn new(stat_kind: &str, stat_object: &str, item_names: Vec<&str>) -> Self {
39+
if item_names.is_empty() {
40+
panic!("StatisticsItem \"itemNames\" is empty");
41+
}
42+
43+
let item_accumulates = item_names.iter().map(|_| AtomicI64::new(0)).collect();
44+
let invoke_times = AtomicI64::new(0);
45+
let last_timestamp = AtomicU64::new(
46+
SystemTime::now()
47+
.duration_since(UNIX_EPOCH)
48+
.unwrap()
49+
.as_secs(),
50+
);
51+
52+
Self {
53+
stat_kind: stat_kind.to_string(),
54+
stat_object: stat_object.to_string(),
55+
item_names: item_names.into_iter().map(String::from).collect(),
56+
item_accumulates,
57+
invoke_times,
58+
last_timestamp,
59+
interceptor: None,
60+
}
61+
}
62+
63+
pub fn inc_items(&self, item_incs: Vec<i64>) {
64+
let len = std::cmp::min(item_incs.len(), self.item_accumulates.len());
65+
for (i, _item) in item_incs.iter().enumerate().take(len) {
66+
self.item_accumulates[i].fetch_add(item_incs[i], Ordering::SeqCst);
67+
}
68+
69+
self.invoke_times.fetch_add(1, Ordering::SeqCst);
70+
self.last_timestamp.store(
71+
SystemTime::now()
72+
.duration_since(UNIX_EPOCH)
73+
.unwrap()
74+
.as_secs(),
75+
Ordering::SeqCst,
76+
);
77+
78+
if let Some(ref interceptor) = self.interceptor {
79+
interceptor.inc(item_incs);
80+
}
81+
}
82+
83+
pub fn all_zeros(&self) -> bool {
84+
if self.invoke_times.load(Ordering::SeqCst) == 0 {
85+
return true;
86+
}
87+
88+
for acc in &self.item_accumulates {
89+
if acc.load(Ordering::SeqCst) != 0 {
90+
return false;
91+
}
92+
}
93+
true
94+
}
95+
96+
// Getters
97+
pub fn stat_kind(&self) -> &str {
98+
&self.stat_kind
99+
}
100+
101+
pub fn stat_object(&self) -> &str {
102+
&self.stat_object
103+
}
104+
105+
pub fn item_names(&self) -> &Vec<String> {
106+
&self.item_names
107+
}
108+
109+
pub fn item_accumulates(&self) -> &Vec<AtomicI64> {
110+
&self.item_accumulates
111+
}
112+
113+
pub fn invoke_times(&self) -> &AtomicI64 {
114+
&self.invoke_times
115+
}
116+
117+
pub fn last_timestamp(&self) -> &AtomicU64 {
118+
&self.last_timestamp
119+
}
120+
121+
pub fn item_accumulate(&self, item_name: &str) -> Option<&AtomicI64> {
122+
self.item_names
123+
.iter()
124+
.position(|name| name == item_name)
125+
.map(|index| &self.item_accumulates[index])
126+
}
127+
128+
pub fn snapshot(&self) -> Self {
129+
let item_accumulates = self
130+
.item_accumulates
131+
.iter()
132+
.map(|acc| AtomicI64::new(acc.load(Ordering::SeqCst)))
133+
.collect();
134+
135+
Self {
136+
stat_kind: self.stat_kind.clone(),
137+
stat_object: self.stat_object.clone(),
138+
item_names: self.item_names.clone(),
139+
item_accumulates,
140+
invoke_times: AtomicI64::new(self.invoke_times.load(Ordering::SeqCst)),
141+
last_timestamp: AtomicU64::new(self.last_timestamp.load(Ordering::SeqCst)),
142+
interceptor: self.interceptor.clone(),
143+
}
144+
}
145+
146+
pub fn subtract(&self, item: &StatisticsItem) -> Self {
147+
if self.stat_kind != item.stat_kind
148+
|| self.stat_object != item.stat_object
149+
|| self.item_names != item.item_names
150+
{
151+
panic!("StatisticsItem's kind, key and itemNames must be exactly the same");
152+
}
153+
154+
let item_accumulates = self
155+
.item_accumulates
156+
.iter()
157+
.zip(&item.item_accumulates)
158+
.map(|(a, b)| AtomicI64::new(a.load(Ordering::SeqCst) - b.load(Ordering::SeqCst)))
159+
.collect();
160+
161+
Self {
162+
stat_kind: self.stat_kind.clone(),
163+
stat_object: self.stat_object.clone(),
164+
item_names: self.item_names.clone(),
165+
item_accumulates,
166+
invoke_times: AtomicI64::new(
167+
self.invoke_times.load(Ordering::SeqCst) - item.invoke_times.load(Ordering::SeqCst),
168+
),
169+
last_timestamp: AtomicU64::new(self.last_timestamp.load(Ordering::SeqCst)),
170+
interceptor: self.interceptor.clone(),
171+
}
172+
}
173+
174+
pub fn get_interceptor(&self) -> Option<Arc<dyn Interceptor + Send + Sync>> {
175+
self.interceptor.clone()
176+
}
177+
178+
pub fn set_interceptor(&mut self, interceptor: Arc<dyn Interceptor + Send + Sync>) {
179+
self.interceptor = Some(interceptor);
180+
}
181+
}
182+
183+
#[cfg(test)]
184+
mod tests {
185+
use std::sync::Arc;
186+
187+
use super::*;
188+
189+
struct TestInterceptor;
190+
191+
impl Interceptor for TestInterceptor {
192+
fn inc(&self, _deltas: Vec<i64>) {}
193+
fn reset(&self) {}
194+
}
195+
196+
#[test]
197+
fn new_statistics_item_initializes_correctly() {
198+
let item = StatisticsItem::new("kind", "object", vec!["item1", "item2"]);
199+
assert_eq!(item.stat_kind(), "kind");
200+
assert_eq!(item.stat_object(), "object");
201+
assert_eq!(item.item_names(), &vec!["item1", "item2"]);
202+
}
203+
204+
#[test]
205+
fn inc_items_updates_values_correctly() {
206+
let item = StatisticsItem::new("kind", "object", vec!["item1", "item2"]);
207+
item.inc_items(vec![1, 2]);
208+
assert_eq!(
209+
item.item_accumulate("item1")
210+
.unwrap()
211+
.load(Ordering::SeqCst),
212+
1
213+
);
214+
assert_eq!(
215+
item.item_accumulate("item2")
216+
.unwrap()
217+
.load(Ordering::SeqCst),
218+
2
219+
);
220+
}
221+
222+
#[test]
223+
fn all_zeros_returns_true_when_all_zeros() {
224+
let item = StatisticsItem::new("kind", "object", vec!["item1", "item2"]);
225+
assert!(item.all_zeros());
226+
}
227+
228+
#[test]
229+
fn all_zeros_returns_false_when_not_all_zeros() {
230+
let item = StatisticsItem::new("kind", "object", vec!["item1", "item2"]);
231+
item.inc_items(vec![1, 0]);
232+
assert!(!item.all_zeros());
233+
}
234+
235+
#[test]
236+
fn snapshot_creates_correct_snapshot() {
237+
let item = StatisticsItem::new("kind", "object", vec!["item1", "item2"]);
238+
item.inc_items(vec![1, 2]);
239+
let snapshot = item.snapshot();
240+
assert_eq!(
241+
snapshot
242+
.item_accumulate("item1")
243+
.unwrap()
244+
.load(Ordering::SeqCst),
245+
1
246+
);
247+
assert_eq!(
248+
snapshot
249+
.item_accumulate("item2")
250+
.unwrap()
251+
.load(Ordering::SeqCst),
252+
2
253+
);
254+
}
255+
256+
#[test]
257+
fn subtract_creates_correct_subtraction() {
258+
let item1 = StatisticsItem::new("kind", "object", vec!["item1", "item2"]);
259+
item1.inc_items(vec![3, 4]);
260+
let item2 = StatisticsItem::new("kind", "object", vec!["item1", "item2"]);
261+
item2.inc_items(vec![1, 2]);
262+
let result = item1.subtract(&item2);
263+
assert_eq!(
264+
result
265+
.item_accumulate("item1")
266+
.unwrap()
267+
.load(Ordering::SeqCst),
268+
2
269+
);
270+
assert_eq!(
271+
result
272+
.item_accumulate("item2")
273+
.unwrap()
274+
.load(Ordering::SeqCst),
275+
2
276+
);
277+
}
278+
279+
#[test]
280+
fn set_interceptor_sets_interceptor_correctly() {
281+
let mut item = StatisticsItem::new("kind", "object", vec!["item1", "item2"]);
282+
item.set_interceptor(Arc::new(TestInterceptor));
283+
assert!(item.get_interceptor().is_some());
284+
}
285+
}

0 commit comments

Comments
 (0)