Skip to content

Commit cc274b0

Browse files
authored
Allocate message queue by config (#1651)
1 parent f756078 commit cc274b0

File tree

2 files changed

+104
-0
lines changed

2 files changed

+104
-0
lines changed

rocketmq-client/src/consumer/rebalance_strategy.rs

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
pub mod allocate_message_queue_averagely;
1818
pub mod allocate_message_queue_averagely_by_circle;
19+
pub mod allocate_message_queue_by_config;
1920
pub mod allocate_message_queue_by_machine_room;
2021
pub mod allocate_message_queue_by_machine_room_nearby;
2122

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use cheetah_string::CheetahString;
18+
use rocketmq_common::common::message::message_queue::MessageQueue;
19+
20+
use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
21+
22+
pub struct AllocateMessageQueueByConfig {
23+
message_queue_list: Vec<MessageQueue>,
24+
}
25+
26+
impl AllocateMessageQueueByConfig {
27+
#[inline]
28+
pub fn new(message_queue_list: Vec<MessageQueue>) -> Self {
29+
Self { message_queue_list }
30+
}
31+
}
32+
33+
impl AllocateMessageQueueStrategy for AllocateMessageQueueByConfig {
34+
fn allocate(
35+
&self,
36+
consumer_group: &CheetahString,
37+
current_cid: &CheetahString,
38+
mq_all: &[MessageQueue],
39+
cid_all: &[CheetahString],
40+
) -> crate::Result<Vec<MessageQueue>> {
41+
Ok(self.message_queue_list.clone())
42+
}
43+
44+
#[inline]
45+
fn get_name(&self) -> &'static str {
46+
"CONFIG"
47+
}
48+
}
49+
50+
#[cfg(test)]
51+
mod tests {
52+
use std::collections::HashMap;
53+
54+
use cheetah_string::CheetahString;
55+
use rocketmq_common::common::message::message_queue::MessageQueue;
56+
57+
use super::*;
58+
59+
#[test]
60+
fn test_allocate_message_queue_by_config() {
61+
let consumer_group = CheetahString::from("test_group");
62+
let current_cid = CheetahString::from("CID_PREFIX1");
63+
let mq_all = create_message_queue_list(4);
64+
let cid_all = create_consumer_id_list(2);
65+
let strategy = AllocateMessageQueueByConfig::new(mq_all.clone());
66+
67+
let mut consumer_allocate_queue = HashMap::new();
68+
for consumer_id in &cid_all {
69+
let queues = strategy
70+
.allocate(&consumer_group, &consumer_id, &mq_all, &cid_all)
71+
.unwrap();
72+
let queue_ids: Vec<i32> = queues.into_iter().map(|mq| mq.get_queue_id()).collect();
73+
consumer_allocate_queue.insert(consumer_id.clone(), queue_ids);
74+
}
75+
76+
assert_eq!(
77+
consumer_allocate_queue
78+
.get("CID_PREFIX0")
79+
.unwrap()
80+
.as_slice(),
81+
&[0, 1, 2, 3]
82+
);
83+
assert_eq!(
84+
consumer_allocate_queue
85+
.get("CID_PREFIX1")
86+
.unwrap()
87+
.as_slice(),
88+
&[0, 1, 2, 3]
89+
);
90+
}
91+
92+
fn create_consumer_id_list(size: usize) -> Vec<CheetahString> {
93+
(0..size)
94+
.map(|i| format!("CID_PREFIX{}", i).into())
95+
.collect()
96+
}
97+
98+
fn create_message_queue_list(size: usize) -> Vec<MessageQueue> {
99+
(0..size)
100+
.map(|i| MessageQueue::from_parts("topic", "broker", i as i32))
101+
.collect()
102+
}
103+
}

0 commit comments

Comments
 (0)