Skip to content

Commit 51168e1

Browse files
authored
[ISSUE mxsm#2066]💫Add ConsumeReviveObj struct🚀 (mxsm#2068)
1 parent cd8f47c commit 51168e1

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,98 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use std::collections::HashMap;
18+
19+
use cheetah_string::CheetahString;
20+
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
21+
1722
pub struct PopReviveService;
23+
24+
struct ConsumeReviveObj {
25+
map: HashMap<CheetahString, PopCheckPoint>,
26+
sort_list: Option<Vec<PopCheckPoint>>,
27+
old_offset: i64,
28+
end_time: i64,
29+
new_offset: i64,
30+
}
31+
32+
impl ConsumeReviveObj {
33+
fn new() -> Self {
34+
Self {
35+
map: HashMap::new(),
36+
sort_list: None,
37+
old_offset: 0,
38+
end_time: 0,
39+
new_offset: 0,
40+
}
41+
}
42+
43+
fn gen_sort_list(&mut self) -> &Vec<PopCheckPoint> {
44+
if self.sort_list.is_none() {
45+
let mut list: Vec<PopCheckPoint> = self.map.values().cloned().collect();
46+
list.sort_by_key(|ck| ck.revive_offset);
47+
self.sort_list = Some(list);
48+
}
49+
self.sort_list.as_ref().unwrap()
50+
}
51+
}
52+
53+
#[cfg(test)]
54+
mod tests {
55+
56+
use cheetah_string::CheetahString;
57+
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
58+
59+
use super::*;
60+
61+
#[test]
62+
fn new_initializes_correctly() {
63+
let obj = ConsumeReviveObj::new();
64+
assert!(obj.map.is_empty());
65+
assert!(obj.sort_list.is_none());
66+
assert_eq!(obj.old_offset, 0);
67+
assert_eq!(obj.end_time, 0);
68+
assert_eq!(obj.new_offset, 0);
69+
}
70+
71+
#[test]
72+
fn gen_sort_list_creates_sorted_list() {
73+
let mut obj = ConsumeReviveObj::new();
74+
let ck1 = PopCheckPoint {
75+
revive_offset: 10,
76+
..Default::default()
77+
};
78+
let ck2 = PopCheckPoint {
79+
revive_offset: 5,
80+
..Default::default()
81+
};
82+
obj.map.insert(CheetahString::from("key1"), ck1.clone());
83+
obj.map.insert(CheetahString::from("key2"), ck2.clone());
84+
85+
let sorted_list = obj.gen_sort_list();
86+
assert_eq!(sorted_list.len(), 2);
87+
assert_eq!(sorted_list[0].revive_offset, 5);
88+
assert_eq!(sorted_list[1].revive_offset, 10);
89+
}
90+
91+
#[test]
92+
fn gen_sort_list_returns_existing_list_if_already_sorted() {
93+
let mut obj = ConsumeReviveObj::new();
94+
let ck1 = PopCheckPoint {
95+
revive_offset: 10,
96+
..Default::default()
97+
};
98+
let ck2 = PopCheckPoint {
99+
revive_offset: 5,
100+
..Default::default()
101+
};
102+
obj.map.insert(CheetahString::from("key1"), ck1.clone());
103+
obj.map.insert(CheetahString::from("key2"), ck2.clone());
104+
105+
let _ = obj.gen_sort_list();
106+
let sorted_list = obj.gen_sort_list();
107+
assert_eq!(sorted_list.len(), 2);
108+
assert_eq!(sorted_list[0].revive_offset, 5);
109+
assert_eq!(sorted_list[1].revive_offset, 10);
110+
}
111+
}

0 commit comments

Comments
 (0)