Skip to content

Commit 8e92073

Browse files
authored
[ISSUE #990]🚀Support NOTIFY_CONSUMER_IDS_CHANGED(request code 40) for client (#993)
1 parent 8ee0033 commit 8e92073

File tree

12 files changed

+461
-117
lines changed

12 files changed

+461
-117
lines changed

‎rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ use crate::consumer::consumer_impl::re_balance::Rebalance;
6969
use crate::consumer::default_mq_push_consumer::ConsumerConfig;
7070
use crate::consumer::listener::message_listener::MessageListener;
7171
use crate::consumer::mq_consumer_inner::MQConsumerInner;
72+
use crate::consumer::mq_consumer_inner::MQConsumerInnerImpl;
7273
use crate::consumer::pull_callback::DefaultPullCallback;
7374
use crate::consumer::store::local_file_offset_store::LocalFileOffsetStore;
7475
use crate::consumer::store::offset_store::OffsetStore;
@@ -95,7 +96,6 @@ const ASYNC_TIMEOUT: u64 = 3000;
9596
const DO_NOT_UPDATE_TOPIC_SUBSCRIBE_INFO_WHEN_SUBSCRIPTION_CHANGED: bool = false;
9697
const _1MB: u64 = 1024 * 1024;
9798

98-
#[derive(Clone)]
9999
pub struct DefaultMQPushConsumerImpl {
100100
pub(crate) global_lock: Arc<Mutex<()>>,
101101
pub(crate) pull_time_delay_mills_when_exception: u64,
@@ -106,7 +106,7 @@ pub struct DefaultMQPushConsumerImpl {
106106
consume_message_hook_list: Vec<Arc<Box<dyn ConsumeMessageHook + Send + Sync>>>,
107107
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
108108
service_state: ArcRefCellWrapper<ServiceState>,
109-
client_instance: Option<ArcRefCellWrapper<MQClientInstance<DefaultMQPushConsumerImpl>>>,
109+
client_instance: Option<ArcRefCellWrapper<MQClientInstance>>,
110110
pub(crate) pull_api_wrapper: Option<ArcRefCellWrapper<PullAPIWrapper>>,
111111
pause: Arc<AtomicBool>,
112112
consume_orderly: bool,
@@ -342,14 +342,18 @@ impl DefaultMQPushConsumerImpl {
342342
.consume_message_pop_orderly_service
343343
.start(wrapper);
344344
}
345-
let consumer_impl = self.clone();
346345
self.client_instance
347346
.as_mut()
348347
.unwrap()
349-
.register_consumer(self.consumer_config.consumer_group.as_str(), consumer_impl)
348+
.register_consumer(
349+
self.consumer_config.consumer_group.as_str(),
350+
MQConsumerInnerImpl {
351+
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(),
352+
},
353+
)
350354
.await;
351-
352-
self.client_instance.as_mut().unwrap().start().await?;
355+
let cloned = self.client_instance.as_mut().cloned().unwrap();
356+
self.client_instance.as_mut().unwrap().start(cloned).await?;
353357
info!(
354358
"the consumer [{}] start OK, message_model={}, isUnitMode={}",
355359
self.consumer_config.consumer_group,
@@ -1006,7 +1010,7 @@ impl DefaultMQPushConsumerImpl {
10061010
class_filter,
10071011
);
10081012
let subscription_data = subscription_data.unwrap();
1009-
let this = self.clone();
1013+
let this = self.default_mqpush_consumer_impl.clone().unwrap();
10101014
let result = self
10111015
.pull_api_wrapper
10121016
.as_mut()
@@ -1186,6 +1190,8 @@ impl DefaultMQPushConsumerImpl {
11861190
.as_mut()
11871191
.unwrap()
11881192
.mq_client_api_impl
1193+
.as_mut()
1194+
.unwrap()
11891195
.consumer_send_message_back(
11901196
broker_addr.as_str(),
11911197
broker_name.as_ref().unwrap().as_str(),
@@ -1258,8 +1264,8 @@ impl DefaultMQPushConsumerImpl {
12581264
}
12591265

12601266
impl MQConsumerInner for DefaultMQPushConsumerImpl {
1261-
fn group_name(&self) -> &str {
1262-
self.consumer_config.consumer_group()
1267+
fn group_name(&self) -> String {
1268+
self.consumer_config.consumer_group().to_string()
12631269
}
12641270

12651271
fn message_model(&self) -> MessageModel {

‎rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ use rocketmq_common::ArcRefCellWrapper;
1919
use tracing::info;
2020
use tracing::warn;
2121

22-
use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
2322
use crate::consumer::consumer_impl::pop_request::PopRequest;
2423
use crate::consumer::consumer_impl::pull_request::PullRequest;
25-
use crate::consumer::mq_consumer_inner::MQConsumerInner;
2624
use crate::factory::mq_client_instance::MQClientInstance;
2725

2826
#[derive(Clone)]
@@ -38,32 +36,23 @@ impl PullMessageService {
3836
pull_tx: None,
3937
}
4038
}
41-
pub async fn start<C>(&mut self, instance: MQClientInstance<C>)
42-
where
43-
C: MQConsumerInner + Clone,
44-
{
39+
pub async fn start(&mut self, instance: ArcRefCellWrapper<MQClientInstance>) {
4540
let (pop_tx, mut pop_rx) = tokio::sync::mpsc::channel(1024 * 4);
4641
let (pull_tx, mut pull_rx) = tokio::sync::mpsc::channel(1024 * 4);
4742
self.pop_tx = Some(pop_tx);
4843
self.pull_tx = Some(pull_tx);
49-
let instance_wrapper = ArcRefCellWrapper::new(instance);
50-
let instance_wrapper_clone = instance_wrapper.clone();
44+
//let instance_wrapper = ArcRefCellWrapper::new(instance);
45+
let instance_wrapper_clone = instance.clone();
5146
tokio::spawn(async move {
5247
info!(
5348
">>>>>>>>>>>>>>>>>>>>>>>PullMessageService [PopRequest] \
5449
started<<<<<<<<<<<<<<<<<<<<<<<<<<<<"
5550
);
5651
while let Some(request) = pop_rx.recv().await {
57-
if let Some(mut consumer) = instance_wrapper
58-
.select_consumer(request.get_consumer_group())
59-
.await
52+
if let Some(mut consumer) =
53+
instance.select_consumer(request.get_consumer_group()).await
6054
{
61-
consumer
62-
.as_any_mut()
63-
.downcast_mut::<DefaultMQPushConsumerImpl>()
64-
.unwrap()
65-
.pop_message(request)
66-
.await;
55+
consumer.pop_message(request).await;
6756
} else {
6857
warn!(
6958
"No matched consumer for the PopRequest {}, drop it",
@@ -82,12 +71,7 @@ impl PullMessageService {
8271
.select_consumer(request.get_consumer_group())
8372
.await
8473
{
85-
consumer
86-
.as_any_mut()
87-
.downcast_mut::<DefaultMQPushConsumerImpl>()
88-
.unwrap()
89-
.pull_message(request)
90-
.await;
74+
consumer.pull_message(request).await;
9175
} else {
9276
warn!(
9377
"No matched consumer for the PullRequest {},drop it",

‎rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_service.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ use std::sync::Arc;
1818
use std::time::Duration;
1919

2020
use once_cell::sync::Lazy;
21+
use rocketmq_common::ArcRefCellWrapper;
2122
use tokio::select;
2223
use tokio::sync::Notify;
2324
use tokio::time::Instant;
2425
use tracing::info;
2526

26-
use crate::consumer::mq_consumer_inner::MQConsumerInner;
2727
use crate::factory::mq_client_instance::MQClientInstance;
2828

2929
static WAIT_INTERVAL: Lazy<Duration> = Lazy::new(|| {
@@ -56,10 +56,7 @@ impl RebalanceService {
5656
}
5757
}
5858

59-
pub async fn start<C>(&mut self, mut instance: MQClientInstance<C>)
60-
where
61-
C: MQConsumerInner + Clone,
62-
{
59+
pub async fn start(&mut self, mut instance: ArcRefCellWrapper<MQClientInstance>) {
6360
let notify = self.notify.clone();
6461
tokio::spawn(async move {
6562
let mut last_rebalance_timestamp = Instant::now();

‎rocketmq-client/src/consumer/mq_consumer_inner.rs

Lines changed: 154 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@ use std::collections::HashSet;
1919

2020
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
2121
use rocketmq_common::common::message::message_queue::MessageQueue;
22+
use rocketmq_common::WeakCellWrapper;
2223
use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo;
2324
use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
2425
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
2526
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
2627

28+
use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
29+
use crate::consumer::consumer_impl::pop_request::PopRequest;
30+
use crate::consumer::consumer_impl::pull_request::PullRequest;
2731
use crate::Result;
2832
#[trait_variant::make(MQConsumerInner: Send)]
2933
pub trait MQConsumerInnerLocal: MQConsumerInnerAny + Sync + 'static {
30-
fn group_name(&self) -> &str;
34+
fn group_name(&self) -> String;
3135

3236
fn message_model(&self) -> MessageModel;
3337

@@ -67,3 +71,152 @@ impl<T: MQConsumerInner> MQConsumerInnerAny for T {
6771
self
6872
}
6973
}
74+
75+
#[derive(Clone)]
76+
pub(crate) struct MQConsumerInnerImpl {
77+
pub(crate) default_mqpush_consumer_impl: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>,
78+
}
79+
80+
impl MQConsumerInnerImpl {
81+
pub(crate) async fn pop_message(&mut self, pop_request: PopRequest) {
82+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
83+
if let Some(mut default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
84+
default_mqpush_consumer_impl.pop_message(pop_request).await;
85+
}
86+
}
87+
}
88+
89+
pub(crate) async fn pull_message(&mut self, pull_request: PullRequest) {
90+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
91+
if let Some(mut default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
92+
default_mqpush_consumer_impl
93+
.pull_message(pull_request)
94+
.await;
95+
}
96+
}
97+
}
98+
}
99+
100+
impl MQConsumerInner for MQConsumerInnerImpl {
101+
fn group_name(&self) -> String {
102+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
103+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
104+
return MQConsumerInner::group_name(default_mqpush_consumer_impl.as_ref());
105+
}
106+
}
107+
panic!("default_mqpush_consumer_impl is None");
108+
}
109+
110+
fn message_model(&self) -> MessageModel {
111+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
112+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
113+
return MQConsumerInner::message_model(default_mqpush_consumer_impl.as_ref());
114+
}
115+
}
116+
panic!("default_mqpush_consumer_impl is None");
117+
}
118+
119+
fn consume_type(&self) -> ConsumeType {
120+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
121+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
122+
return MQConsumerInner::consume_type(default_mqpush_consumer_impl.as_ref());
123+
}
124+
}
125+
panic!("default_mqpush_consumer_impl is None");
126+
}
127+
128+
fn consume_from_where(&self) -> ConsumeFromWhere {
129+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
130+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
131+
return MQConsumerInner::consume_from_where(default_mqpush_consumer_impl.as_ref());
132+
}
133+
}
134+
panic!("default_mqpush_consumer_impl is None");
135+
}
136+
137+
fn subscriptions(&self) -> HashSet<SubscriptionData> {
138+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
139+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
140+
return MQConsumerInner::subscriptions(default_mqpush_consumer_impl.as_ref());
141+
}
142+
}
143+
panic!("default_mqpush_consumer_impl is None");
144+
}
145+
146+
fn do_rebalance(&self) {
147+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
148+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
149+
return MQConsumerInner::do_rebalance(default_mqpush_consumer_impl.as_ref());
150+
}
151+
}
152+
panic!("default_mqpush_consumer_impl is None");
153+
}
154+
155+
async fn try_rebalance(&self) -> Result<bool> {
156+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
157+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
158+
return MQConsumerInner::try_rebalance(default_mqpush_consumer_impl.as_ref()).await;
159+
}
160+
}
161+
panic!("default_mqpush_consumer_impl is None");
162+
}
163+
164+
async fn persist_consumer_offset(&self) {
165+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
166+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
167+
return MQConsumerInner::persist_consumer_offset(
168+
default_mqpush_consumer_impl.as_ref(),
169+
)
170+
.await;
171+
}
172+
}
173+
panic!("default_mqpush_consumer_impl is None");
174+
}
175+
176+
async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>) {
177+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
178+
if let Some(mut default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
179+
return MQConsumerInner::update_topic_subscribe_info(
180+
default_mqpush_consumer_impl.as_mut(),
181+
topic,
182+
info,
183+
)
184+
.await;
185+
}
186+
}
187+
panic!("default_mqpush_consumer_impl is None");
188+
}
189+
190+
async fn is_subscribe_topic_need_update(&self, topic: &str) -> bool {
191+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
192+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
193+
return MQConsumerInner::is_subscribe_topic_need_update(
194+
default_mqpush_consumer_impl.as_ref(),
195+
topic,
196+
)
197+
.await;
198+
}
199+
}
200+
panic!("default_mqpush_consumer_impl is None");
201+
}
202+
203+
fn is_unit_mode(&self) -> bool {
204+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
205+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
206+
return MQConsumerInner::is_unit_mode(default_mqpush_consumer_impl.as_ref());
207+
}
208+
}
209+
panic!("default_mqpush_consumer_impl is None");
210+
}
211+
212+
fn consumer_running_info(&self) -> ConsumerRunningInfo {
213+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
214+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
215+
return MQConsumerInner::consumer_running_info(
216+
default_mqpush_consumer_impl.as_ref(),
217+
);
218+
}
219+
}
220+
panic!("default_mqpush_consumer_impl is None");
221+
}
222+
}

0 commit comments

Comments
 (0)