Skip to content

Commit 7ca0d7e

Browse files
authored
[ISSUE #1742]⚡️Enhancement OrderInfo struct add some methods (#1744)
1 parent 18254bb commit 7ca0d7e

File tree

1 file changed

+292
-3
lines changed

1 file changed

+292
-3
lines changed

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

+292-3
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
*/
1717

1818
use std::collections::HashMap;
19+
use std::collections::HashSet;
1920
use std::ops::Deref;
2021
use std::sync::Arc;
2122

2223
use cheetah_string::CheetahString;
2324
use rocketmq_common::common::broker::broker_config::BrokerConfig;
2425
use rocketmq_common::common::config_manager::ConfigManager;
26+
use rocketmq_common::TimeUtils::get_current_millis;
2527
use serde::Deserialize;
2628
use serde::Serialize;
2729
use tracing::warn;
@@ -121,9 +123,7 @@ impl ConsumerOrderInfoManager {
121123
);
122124
return;
123125
}
124-
order_info
125-
.offset_next_visible_time
126-
.insert(queue_offset, next_visible_time);
126+
order_info.update_offset_next_visible_time(queue_offset, next_visible_time);
127127
self.update_lock_free_timestamp(topic, group, queue_id, order_info);
128128
}
129129

@@ -168,3 +168,292 @@ pub(crate) struct OrderInfo {
168168
#[serde(rename = "a")]
169169
attempt_id: String,
170170
}
171+
172+
impl OrderInfo {
173+
/// Builds a list of offsets from a given list of queue offsets.
174+
/// If the list contains only one element, it returns the same list.
175+
/// Otherwise, it returns a list where each element is the difference
176+
/// between the current and the first element.
177+
///
178+
/// # Arguments
179+
///
180+
/// * `queue_offset_list` - A vector of queue offsets.
181+
///
182+
/// # Returns
183+
///
184+
/// A vector of offsets.
185+
pub fn build_offset_list(queue_offset_list: Vec<u64>) -> Vec<u64> {
186+
let mut simple = Vec::new();
187+
if queue_offset_list.len() == 1 {
188+
simple.extend(queue_offset_list);
189+
return simple;
190+
}
191+
let first = queue_offset_list[0];
192+
simple.push(first);
193+
for item in queue_offset_list.iter().skip(1) {
194+
simple.push(*item - first);
195+
}
196+
simple
197+
}
198+
199+
/// Determines if the current order info needs to be blocked based on the attempt ID
200+
/// and the current invisible time.
201+
///
202+
/// # Arguments
203+
///
204+
/// * `attempt_id` - The attempt ID to check.
205+
/// * `current_invisible_time` - The current invisible time.
206+
///
207+
/// # Returns
208+
///
209+
/// `true` if the order info needs to be blocked, `false` otherwise.
210+
pub fn need_block(&mut self, attempt_id: &str, current_invisible_time: u64) -> bool {
211+
if self.offset_list.is_empty() {
212+
return false;
213+
}
214+
if self.attempt_id == attempt_id {
215+
return false;
216+
}
217+
let num = self.offset_list.len();
218+
if self.invisible_time.is_none() || self.invisible_time.unwrap_or(0) == 0 {
219+
self.invisible_time = Some(current_invisible_time);
220+
}
221+
let current_time = get_current_millis();
222+
for (i, _) in (0..num).enumerate() {
223+
if self.is_not_ack(i) {
224+
let mut next_visible_time = self.pop_time + self.invisible_time.unwrap_or(0);
225+
if let Some(time) = self.offset_next_visible_time.get(&self.get_queue_offset(i)) {
226+
next_visible_time = *time;
227+
}
228+
if current_time < next_visible_time {
229+
return true;
230+
}
231+
}
232+
}
233+
false
234+
}
235+
236+
/// Gets the lock-free timestamp for the current order info.
237+
///
238+
/// # Returns
239+
///
240+
/// An `Option<u64>` containing the lock-free timestamp if available, `None` otherwise.
241+
pub fn get_lock_free_timestamp(&self) -> Option<u64> {
242+
if self.offset_list.is_empty() {
243+
return None;
244+
}
245+
let current_time = get_current_millis();
246+
for i in 0..self.offset_list.len() {
247+
if self.is_not_ack(i) {
248+
if self.invisible_time.is_none() || self.invisible_time.unwrap_or(0) == 0 {
249+
return None;
250+
}
251+
let mut next_visible_time = self.pop_time + self.invisible_time.unwrap_or(0);
252+
if let Some(time) = self.offset_next_visible_time.get(&self.get_queue_offset(i)) {
253+
next_visible_time = *time;
254+
}
255+
if current_time < next_visible_time {
256+
return Some(next_visible_time);
257+
}
258+
}
259+
}
260+
Some(current_time)
261+
}
262+
263+
/// Updates the next visible time for a given queue offset.
264+
///
265+
/// # Arguments
266+
///
267+
/// * `queue_offset` - The queue offset to update.
268+
/// * `next_visible_time` - The next visible time to set.
269+
#[inline]
270+
pub fn update_offset_next_visible_time(&mut self, queue_offset: u64, next_visible_time: u64) {
271+
self.offset_next_visible_time
272+
.insert(queue_offset, next_visible_time);
273+
}
274+
275+
/// Gets the next offset for the current order info.
276+
///
277+
/// # Returns
278+
///
279+
/// An `i64` representing the next offset. Returns -2 if the offset list is empty.
280+
pub fn get_next_offset(&self) -> i64 {
281+
if self.offset_list.is_empty() {
282+
return -2;
283+
}
284+
let mut i = 0;
285+
for j in 0..self.offset_list.len() {
286+
if self.is_not_ack(j) {
287+
break;
288+
}
289+
i += 1;
290+
}
291+
if i == self.offset_list.len() {
292+
self.get_queue_offset(self.offset_list.len() - 1) as i64 + 1
293+
} else {
294+
self.get_queue_offset(i) as i64
295+
}
296+
}
297+
298+
/// Gets the queue offset for a given offset index.
299+
///
300+
/// # Arguments
301+
///
302+
/// * `offset_index` - The index of the offset to get.
303+
///
304+
/// # Returns
305+
///
306+
/// A `u64` representing the queue offset.
307+
pub fn get_queue_offset(&self, offset_index: usize) -> u64 {
308+
if offset_index == 0 {
309+
return self.offset_list[0];
310+
}
311+
self.offset_list[0] + self.offset_list[offset_index]
312+
}
313+
314+
/// Checks if the offset at the given index is not acknowledged.
315+
///
316+
/// # Arguments
317+
///
318+
/// * `offset_index` - The index of the offset to check.
319+
///
320+
/// # Returns
321+
///
322+
/// `true` if the offset is not acknowledged, `false` otherwise.
323+
pub fn is_not_ack(&self, offset_index: usize) -> bool {
324+
if offset_index >= 64 {
325+
return false;
326+
}
327+
(self.commit_offset_bit & (1 << offset_index)) == 0
328+
}
329+
330+
/// Merges the offset consumed count with the previous attempt ID and offset list.
331+
///
332+
/// # Arguments
333+
///
334+
/// * `pre_attempt_id` - The previous attempt ID.
335+
/// * `pre_offset_list` - The previous offset list.
336+
/// * `prev_offset_consumed_count` - The previous offset consumed count.
337+
pub fn merge_offset_consumed_count(
338+
&mut self,
339+
pre_attempt_id: &str,
340+
pre_offset_list: Vec<u64>,
341+
prev_offset_consumed_count: HashMap<u64, i32>,
342+
) {
343+
let mut offset_consumed_count = HashMap::new();
344+
if pre_attempt_id == self.attempt_id {
345+
self.offset_consumed_count = prev_offset_consumed_count;
346+
return;
347+
}
348+
let mut pre_queue_offset_set = HashSet::new();
349+
for (index, _) in pre_offset_list.iter().enumerate() {
350+
pre_queue_offset_set.insert(Self::get_queue_offset_from_list(&pre_offset_list, index));
351+
}
352+
for i in 0..self.offset_list.len() {
353+
let queue_offset = self.get_queue_offset(i);
354+
if pre_queue_offset_set.contains(&queue_offset) {
355+
let mut count = 1;
356+
if let Some(pre_count) = prev_offset_consumed_count.get(&queue_offset) {
357+
count += pre_count;
358+
}
359+
offset_consumed_count.insert(queue_offset, count);
360+
}
361+
}
362+
self.offset_consumed_count = offset_consumed_count;
363+
}
364+
365+
/// Gets the queue offset from a list of offsets for a given index.
366+
///
367+
/// # Arguments
368+
///
369+
/// * `pre_offset_list` - The list of previous offsets.
370+
/// * `offset_index` - The index of the offset to get.
371+
///
372+
/// # Returns
373+
///
374+
/// A `u64` representing the queue offset.
375+
fn get_queue_offset_from_list(pre_offset_list: &[u64], offset_index: usize) -> u64 {
376+
if offset_index == 0 {
377+
return pre_offset_list[0];
378+
}
379+
pre_offset_list[0] + pre_offset_list[offset_index]
380+
}
381+
}
382+
383+
#[cfg(test)]
384+
mod tests {
385+
use std::collections::HashMap;
386+
387+
use super::*;
388+
389+
#[test]
390+
fn build_offset_list_with_single_element() {
391+
let queue_offset_list = vec![10];
392+
let expected = vec![10];
393+
assert_eq!(OrderInfo::build_offset_list(queue_offset_list), expected);
394+
}
395+
396+
#[test]
397+
fn need_block_returns_false_for_empty_offset_list() {
398+
let mut order_info = OrderInfo {
399+
pop_time: 1000,
400+
invisible_time: Some(3000),
401+
offset_list: vec![],
402+
offset_next_visible_time: HashMap::new(),
403+
offset_consumed_count: HashMap::new(),
404+
last_consume_timestamp: 0,
405+
commit_offset_bit: 0,
406+
attempt_id: "test".to_string(),
407+
};
408+
assert!(!order_info.need_block("another_test", 2000));
409+
}
410+
411+
#[test]
412+
fn get_lock_free_timestamp_returns_none_for_empty_offset_list() {
413+
let order_info = OrderInfo {
414+
pop_time: 1000,
415+
invisible_time: Some(3000),
416+
offset_list: vec![],
417+
offset_next_visible_time: HashMap::new(),
418+
offset_consumed_count: HashMap::new(),
419+
last_consume_timestamp: 0,
420+
commit_offset_bit: 0,
421+
attempt_id: "test".to_string(),
422+
};
423+
assert_eq!(order_info.get_lock_free_timestamp(), None);
424+
}
425+
426+
#[test]
427+
fn get_next_offset_returns_minus_two_for_empty_offset_list() {
428+
let order_info = OrderInfo {
429+
pop_time: 1000,
430+
invisible_time: Some(3000),
431+
offset_list: vec![],
432+
offset_next_visible_time: HashMap::new(),
433+
offset_consumed_count: HashMap::new(),
434+
last_consume_timestamp: 0,
435+
commit_offset_bit: 0,
436+
attempt_id: "test".to_string(),
437+
};
438+
assert_eq!(order_info.get_next_offset(), -2);
439+
}
440+
441+
#[test]
442+
fn merge_offset_consumed_count_with_same_attempt_id() {
443+
let mut order_info = OrderInfo {
444+
pop_time: 0,
445+
invisible_time: None,
446+
offset_list: vec![1, 2, 3],
447+
offset_next_visible_time: HashMap::new(),
448+
offset_consumed_count: HashMap::new(),
449+
last_consume_timestamp: 0,
450+
commit_offset_bit: 0,
451+
attempt_id: "test".to_string(),
452+
};
453+
let pre_offset_list = vec![1, 2];
454+
let prev_offset_consumed_count = HashMap::from([(1, 1), (2, 1)]);
455+
order_info.merge_offset_consumed_count("test", pre_offset_list, prev_offset_consumed_count);
456+
assert_eq!(order_info.offset_consumed_count.get(&1), Some(&1));
457+
assert_eq!(order_info.offset_consumed_count.get(&2), Some(&1));
458+
}
459+
}

0 commit comments

Comments
 (0)