Skip to content

Commit 6abdf71

Browse files
authored
[ISSUE #3154]🚀Implement NotificationProcessor (#3157)
1 parent 677daa7 commit 6abdf71

File tree

6 files changed

+378
-21
lines changed

6 files changed

+378
-21
lines changed

rocketmq-broker/src/broker_runtime.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ impl BrokerRuntime {
577577
));
578578
self.inner.ack_message_processor = Some(ack_message_processor.clone());
579579

580-
let notification_processor = ArcMut::new(NotificationProcessor::default());
580+
let notification_processor = NotificationProcessor::new(self.inner.clone());
581581
self.inner.notification_processor = Some(notification_processor.clone());
582582
BrokerRequestProcessor {
583583
send_message_processor: ArcMut::new(send_message_processor),
@@ -1335,7 +1335,7 @@ pub(crate) struct BrokerRuntimeInner<MS> {
13351335
//Processor
13361336
pop_message_processor: Option<ArcMut<PopMessageProcessor<MS>>>,
13371337
ack_message_processor: Option<ArcMut<AckMessageProcessor<MS>>>,
1338-
notification_processor: Option<ArcMut<NotificationProcessor>>,
1338+
notification_processor: Option<ArcMut<NotificationProcessor<MS>>>,
13391339
broker_attached_plugins: Vec<Arc<dyn BrokerAttachedPlugin>>,
13401340
}
13411341

@@ -2147,7 +2147,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
21472147
unsafe { self.ack_message_processor.as_ref().unwrap_unchecked() }
21482148
}
21492149

2150-
pub fn notification_processor_unchecked(&self) -> &ArcMut<NotificationProcessor> {
2150+
pub fn notification_processor_unchecked(&self) -> &ArcMut<NotificationProcessor<MS>> {
21512151
unsafe { self.notification_processor.as_ref().unwrap_unchecked() }
21522152
}
21532153

rocketmq-broker/src/long_polling/notify_message_arriving_listener.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ where
8181

8282
self.broker_runtime_inner
8383
.notification_processor_unchecked()
84-
.notify_message_arriving_full(
84+
.notify_message_arriving(
8585
topic.clone(),
8686
queue_id,
8787
tags_code,

rocketmq-broker/src/processor.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub struct BrokerRequestProcessor<MS, TS> {
6767
pub(crate) pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
6868
pub(crate) ack_message_processor: ArcMut<AckMessageProcessor<MS>>,
6969
pub(crate) change_invisible_time_processor: ArcMut<ChangeInvisibleTimeProcessor<MS>>,
70-
pub(crate) notification_processor: ArcMut<NotificationProcessor>,
70+
pub(crate) notification_processor: ArcMut<NotificationProcessor<MS>>,
7171
pub(crate) polling_info_processor: ArcMut<PollingInfoProcessor>,
7272
pub(crate) reply_message_processor: ArcMut<ReplyMessageProcessor<MS, TS>>,
7373
pub(crate) query_message_processor: ArcMut<QueryMessageProcessor<MS>>,

0 commit comments

Comments
 (0)