Skip to content

[ISSUE #1742]⚡️Enhancement OrderInfo struct add some methods #1744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 292 additions & 3 deletions rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/

use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::Deref;
use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_common::TimeUtils::get_current_millis;
use serde::Deserialize;
use serde::Serialize;
use tracing::warn;
Expand Down Expand Up @@ -121,9 +123,7 @@
);
return;
}
order_info
.offset_next_visible_time
.insert(queue_offset, next_visible_time);
order_info.update_offset_next_visible_time(queue_offset, next_visible_time);

Check warning on line 126 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L126

Added line #L126 was not covered by tests
self.update_lock_free_timestamp(topic, group, queue_id, order_info);
}

Expand Down Expand Up @@ -168,3 +168,292 @@
#[serde(rename = "a")]
attempt_id: String,
}

impl OrderInfo {
/// Builds a list of offsets from a given list of queue offsets.
/// If the list contains only one element, it returns the same list.
/// Otherwise, it returns a list where each element is the difference
/// between the current and the first element.
///
/// # Arguments
///
/// * `queue_offset_list` - A vector of queue offsets.
///
/// # Returns
///
/// A vector of offsets.
pub fn build_offset_list(queue_offset_list: Vec<u64>) -> Vec<u64> {
let mut simple = Vec::new();
if queue_offset_list.len() == 1 {
simple.extend(queue_offset_list);
return simple;
}
let first = queue_offset_list[0];
simple.push(first);
for item in queue_offset_list.iter().skip(1) {
simple.push(*item - first);
}
simple

Check warning on line 196 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L190-L196

Added lines #L190 - L196 were not covered by tests
}
Comment on lines +185 to +197
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential integer underflow in build_offset_list

In the build_offset_list method, subtracting first from *item where both are u64 may result in integer underflow if *item is less than first. Since u64 is an unsigned integer, this underflow will cause a panic in debug builds or wrap around in release builds, leading to incorrect results.

Consider validating that each *item is greater than or equal to first before performing the subtraction, or handle the case where *item is less than first.

 for item in queue_offset_list.iter().skip(1) {
+    if *item < first {
+        // Handle the error, e.g., return an error or skip the value
+    }
     simple.push(*item - first);
 }

Committable suggestion skipped: line range outside the PR's diff.


/// Determines if the current order info needs to be blocked based on the attempt ID
/// and the current invisible time.
///
/// # Arguments
///
/// * `attempt_id` - The attempt ID to check.
/// * `current_invisible_time` - The current invisible time.
///
/// # Returns
///
/// `true` if the order info needs to be blocked, `false` otherwise.
pub fn need_block(&mut self, attempt_id: &str, current_invisible_time: u64) -> bool {
if self.offset_list.is_empty() {
return false;
}
if self.attempt_id == attempt_id {
return false;
}
let num = self.offset_list.len();
if self.invisible_time.is_none() || self.invisible_time.unwrap_or(0) == 0 {
self.invisible_time = Some(current_invisible_time);
}
let current_time = get_current_millis();
for (i, _) in (0..num).enumerate() {
if self.is_not_ack(i) {
let mut next_visible_time = self.pop_time + self.invisible_time.unwrap_or(0);
if let Some(time) = self.offset_next_visible_time.get(&self.get_queue_offset(i)) {
next_visible_time = *time;
}
if current_time < next_visible_time {
return true;
}
}

Check warning on line 231 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L213-L231

Added lines #L213 - L231 were not covered by tests
}
false

Check warning on line 233 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L233

Added line #L233 was not covered by tests
}

/// Gets the lock-free timestamp for the current order info.
///
/// # Returns
///
/// An `Option<u64>` containing the lock-free timestamp if available, `None` otherwise.
pub fn get_lock_free_timestamp(&self) -> Option<u64> {
if self.offset_list.is_empty() {
return None;
}
let current_time = get_current_millis();
for i in 0..self.offset_list.len() {
if self.is_not_ack(i) {
if self.invisible_time.is_none() || self.invisible_time.unwrap_or(0) == 0 {
return None;
}
let mut next_visible_time = self.pop_time + self.invisible_time.unwrap_or(0);
if let Some(time) = self.offset_next_visible_time.get(&self.get_queue_offset(i)) {
next_visible_time = *time;
}
if current_time < next_visible_time {
return Some(next_visible_time);
}
}

Check warning on line 258 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L244-L258

Added lines #L244 - L258 were not covered by tests
}
Some(current_time)

Check warning on line 260 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L260

Added line #L260 was not covered by tests
}

/// Updates the next visible time for a given queue offset.
///
/// # Arguments
///
/// * `queue_offset` - The queue offset to update.
/// * `next_visible_time` - The next visible time to set.
#[inline]
pub fn update_offset_next_visible_time(&mut self, queue_offset: u64, next_visible_time: u64) {
self.offset_next_visible_time
.insert(queue_offset, next_visible_time);
}

Check warning on line 273 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L270-L273

Added lines #L270 - L273 were not covered by tests

/// Gets the next offset for the current order info.
///
/// # Returns
///
/// An `i64` representing the next offset. Returns -2 if the offset list is empty.
pub fn get_next_offset(&self) -> i64 {
if self.offset_list.is_empty() {
return -2;
}
let mut i = 0;
for j in 0..self.offset_list.len() {
if self.is_not_ack(j) {
break;
}
i += 1;

Check warning on line 289 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L283-L289

Added lines #L283 - L289 were not covered by tests
}
if i == self.offset_list.len() {
self.get_queue_offset(self.offset_list.len() - 1) as i64 + 1

Check warning on line 292 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L291-L292

Added lines #L291 - L292 were not covered by tests
} else {
self.get_queue_offset(i) as i64

Check warning on line 294 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L294

Added line #L294 was not covered by tests
}
}

/// Gets the queue offset for a given offset index.
///
/// # Arguments
///
/// * `offset_index` - The index of the offset to get.
///
/// # Returns
///
/// A `u64` representing the queue offset.
pub fn get_queue_offset(&self, offset_index: usize) -> u64 {
if offset_index == 0 {
return self.offset_list[0];
}
self.offset_list[0] + self.offset_list[offset_index]
}

Check warning on line 312 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L307-L312

Added lines #L307 - L312 were not covered by tests

/// Checks if the offset at the given index is not acknowledged.
///
/// # Arguments
///
/// * `offset_index` - The index of the offset to check.
///
/// # Returns
///
/// `true` if the offset is not acknowledged, `false` otherwise.
pub fn is_not_ack(&self, offset_index: usize) -> bool {
if offset_index >= 64 {
return false;
}
(self.commit_offset_bit & (1 << offset_index)) == 0
}

Check warning on line 328 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L323-L328

Added lines #L323 - L328 were not covered by tests

/// Merges the offset consumed count with the previous attempt ID and offset list.
///
/// # Arguments
///
/// * `pre_attempt_id` - The previous attempt ID.
/// * `pre_offset_list` - The previous offset list.
/// * `prev_offset_consumed_count` - The previous offset consumed count.
pub fn merge_offset_consumed_count(
&mut self,
pre_attempt_id: &str,
pre_offset_list: Vec<u64>,
prev_offset_consumed_count: HashMap<u64, i32>,
) {
let mut offset_consumed_count = HashMap::new();
if pre_attempt_id == self.attempt_id {
self.offset_consumed_count = prev_offset_consumed_count;
return;
}
let mut pre_queue_offset_set = HashSet::new();
for (index, _) in pre_offset_list.iter().enumerate() {
pre_queue_offset_set.insert(Self::get_queue_offset_from_list(&pre_offset_list, index));
}
for i in 0..self.offset_list.len() {
let queue_offset = self.get_queue_offset(i);
if pre_queue_offset_set.contains(&queue_offset) {
let mut count = 1;
if let Some(pre_count) = prev_offset_consumed_count.get(&queue_offset) {
count += pre_count;
}
offset_consumed_count.insert(queue_offset, count);
}

Check warning on line 360 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L347-L360

Added lines #L347 - L360 were not covered by tests
}
self.offset_consumed_count = offset_consumed_count;

Check warning on line 362 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L362

Added line #L362 was not covered by tests
}

/// Gets the queue offset from a list of offsets for a given index.
///
/// # Arguments
///
/// * `pre_offset_list` - The list of previous offsets.
/// * `offset_index` - The index of the offset to get.
///
/// # Returns
///
/// A `u64` representing the queue offset.
fn get_queue_offset_from_list(pre_offset_list: &[u64], offset_index: usize) -> u64 {
if offset_index == 0 {
return pre_offset_list[0];
}
pre_offset_list[0] + pre_offset_list[offset_index]
}

Check warning on line 380 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L375-L380

Added lines #L375 - L380 were not covered by tests
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::*;

#[test]
fn build_offset_list_with_single_element() {
let queue_offset_list = vec![10];
let expected = vec![10];
assert_eq!(OrderInfo::build_offset_list(queue_offset_list), expected);
}

#[test]
fn need_block_returns_false_for_empty_offset_list() {
let mut order_info = OrderInfo {
pop_time: 1000,
invisible_time: Some(3000),
offset_list: vec![],
offset_next_visible_time: HashMap::new(),
offset_consumed_count: HashMap::new(),
last_consume_timestamp: 0,
commit_offset_bit: 0,
attempt_id: "test".to_string(),
};
assert!(!order_info.need_block("another_test", 2000));
}

#[test]
fn get_lock_free_timestamp_returns_none_for_empty_offset_list() {
let order_info = OrderInfo {
pop_time: 1000,
invisible_time: Some(3000),
offset_list: vec![],
offset_next_visible_time: HashMap::new(),
offset_consumed_count: HashMap::new(),
last_consume_timestamp: 0,
commit_offset_bit: 0,
attempt_id: "test".to_string(),
};
assert_eq!(order_info.get_lock_free_timestamp(), None);
}

#[test]
fn get_next_offset_returns_minus_two_for_empty_offset_list() {
let order_info = OrderInfo {
pop_time: 1000,
invisible_time: Some(3000),
offset_list: vec![],
offset_next_visible_time: HashMap::new(),
offset_consumed_count: HashMap::new(),
last_consume_timestamp: 0,
commit_offset_bit: 0,
attempt_id: "test".to_string(),
};
assert_eq!(order_info.get_next_offset(), -2);
}

#[test]
fn merge_offset_consumed_count_with_same_attempt_id() {
let mut order_info = OrderInfo {
pop_time: 0,
invisible_time: None,
offset_list: vec![1, 2, 3],
offset_next_visible_time: HashMap::new(),
offset_consumed_count: HashMap::new(),
last_consume_timestamp: 0,
commit_offset_bit: 0,
attempt_id: "test".to_string(),
};
let pre_offset_list = vec![1, 2];
let prev_offset_consumed_count = HashMap::from([(1, 1), (2, 1)]);
order_info.merge_offset_consumed_count("test", pre_offset_list, prev_offset_consumed_count);
assert_eq!(order_info.offset_consumed_count.get(&1), Some(&1));
assert_eq!(order_info.offset_consumed_count.get(&2), Some(&1));
}
}
Loading