Skip to content

Commit a94311c

Browse files
authored
[ISSUE #972]🔥Supports client clusting consume🚀 (#980)
1 parent d0e911a commit a94311c

38 files changed

+2674
-186
lines changed

rocketmq-client/examples/quickstart/consumer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub const MESSAGE_COUNT: usize = 1;
2727
pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_4";
2828
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
2929
pub const TOPIC: &str = "TopicTest";
30-
pub const TAG: &str = "TagA";
30+
pub const TAG: &str = "*";
3131

3232
#[rocketmq::main]
3333
pub async fn main() -> Result<()> {
@@ -53,8 +53,8 @@ pub struct MyMessageListener;
5353
impl MessageListenerConcurrently for MyMessageListener {
5454
fn consume_message(
5555
&self,
56-
msgs: Vec<MessageExt>,
57-
_context: ConsumeConcurrentlyContext,
56+
msgs: &[&MessageExt],
57+
_context: &ConsumeConcurrentlyContext,
5858
) -> Result<ConsumeConcurrentlyStatus> {
5959
for msg in msgs {
6060
println!("Receive message: {:?}", msg);

rocketmq-client/src/base/client_config.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,15 +142,18 @@ impl ClientConfig {
142142
)
143143
}
144144

145-
pub fn queue_with_namespace(&mut self, queue: &mut MessageQueue) {
145+
pub fn queue_with_namespace(&mut self, queue: MessageQueue) -> MessageQueue {
146146
if let Some(namespace) = self.get_namespace() {
147147
if !namespace.is_empty() {
148-
queue.set_topic(NamespaceUtil::wrap_namespace(
148+
let mut message_queue = queue.clone();
149+
message_queue.set_topic(NamespaceUtil::wrap_namespace(
149150
namespace.as_str(),
150151
queue.get_topic(),
151152
));
153+
return message_queue;
152154
}
153155
}
156+
queue
154157
}
155158

156159
pub fn get_namespace(&mut self) -> Option<String> {

rocketmq-client/src/consumer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ pub mod message_selector;
2424
pub mod mq_consumer;
2525
pub(crate) mod mq_consumer_inner;
2626
pub mod mq_push_consumer;
27-
mod pull_callback;
28-
mod pull_result;
29-
mod pull_status;
27+
pub(crate) mod pull_callback;
28+
pub(crate) mod pull_result;
29+
pub(crate) mod pull_status;
3030
pub mod rebalance_strategy;
31-
mod store;
31+
pub(crate) mod store;

rocketmq-client/src/consumer/consumer_impl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub(crate) mod process_queue;
2929
pub(crate) mod pull_api_wrapper;
3030
pub(crate) mod pull_message_service;
3131
pub(crate) mod pull_request;
32+
pub(crate) mod pull_request_ext;
3233
pub(crate) mod re_balance;
3334

3435
pub(crate) static PULL_MAX_IDLE_TIME: Lazy<u64> = Lazy::new(|| {

0 commit comments

Comments
 (0)