Skip to content

Commit 62c4360

Browse files
authored
[ISSUE #435]⚡️Improving broker receive a single message🎨 (#436)
* Improving broker receive a single message
1 parent be2ab3f commit 62c4360

File tree

8 files changed

+233
-95
lines changed

8 files changed

+233
-95
lines changed

rocketmq-broker/src/mqtrace.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@
1616
*/
1717

1818
pub(crate) mod send_message_context;
19+
pub(crate) mod send_message_hook;
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use crate::mqtrace::send_message_context::SendMessageContext;
18+
19+
/// The `SendMessageHook` trait defines a common interface for sending messages.
20+
/// It is designed to be thread-safe and have a static lifetime.
21+
///
22+
/// This trait is composed of three methods:
23+
/// - `hook_name`: Returns a string slice that represents the name of the hook.
24+
/// - `send_message_before`: Called before a message is sent. It takes a reference to a
25+
/// `SendMessageContext`.
26+
/// - `send_message_after`: Called after a message is sent. It also takes a reference to a
27+
/// `SendMessageContext`.
28+
pub trait SendMessageHook: Send + Sync + 'static {
29+
/// Returns the name of the hook.
30+
///
31+
/// # Returns
32+
///
33+
/// A string slice that represents the name of the hook.
34+
fn hook_name(&self) -> &str;
35+
36+
/// Called before a message is sent.
37+
///
38+
/// # Parameters
39+
///
40+
/// * `context`: A reference to a `SendMessageContext`.
41+
fn send_message_before(&self, context: &SendMessageContext);
42+
43+
/// Called after a message is sent.
44+
///
45+
/// # Parameters
46+
///
47+
/// * `context`: A reference to a `SendMessageContext`.
48+
fn send_message_after(&self, context: &SendMessageContext);
49+
}

rocketmq-broker/src/processor.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ use rocketmq_remoting::{
3535
},
3636
protocol::{
3737
header::message_operation_header::{
38-
send_message_request_header::SendMessageRequestHeader, TopicRequestHeaderTrait,
38+
send_message_request_header::SendMessageRequestHeader,
39+
send_message_response_header::SendMessageResponseHeader, TopicRequestHeaderTrait,
3940
},
4041
remoting_command::RemotingCommand,
4142
NamespaceUtil,
@@ -49,7 +50,7 @@ use tracing::{info, warn};
4950

5051
use self::client_manage_processor::ClientManageProcessor;
5152
use crate::{
52-
mqtrace::send_message_context::SendMessageContext,
53+
mqtrace::{send_message_context::SendMessageContext, send_message_hook::SendMessageHook},
5354
processor::{
5455
admin_broker_processor::AdminBrokerProcessor, send_message_processor::SendMessageProcessor,
5556
},
@@ -76,7 +77,6 @@ where
7677
pub(crate) admin_broker_processor: AdminBrokerProcessor,
7778
pub(crate) client_manage_processor: ClientManageProcessor,
7879
}
79-
8080
impl<MS: Clone> Clone for BrokerRequestProcessor<MS> {
8181
fn clone(&self) -> Self {
8282
Self {
@@ -120,9 +120,38 @@ impl<MS: MessageStore + Send + Sync + 'static> RequestProcessor for BrokerReques
120120
pub(crate) struct SendMessageProcessorInner {
121121
pub(crate) broker_config: Arc<BrokerConfig>,
122122
pub(crate) topic_config_manager: TopicConfigManager,
123+
pub(crate) send_message_hook_vec: Arc<parking_lot::RwLock<Vec<Box<dyn SendMessageHook>>>>,
123124
}
124125

125126
impl SendMessageProcessorInner {
127+
pub(crate) fn execute_send_message_hook_before(&self, context: &SendMessageContext) {
128+
for hook in self.send_message_hook_vec.read().iter() {
129+
hook.send_message_before(context);
130+
}
131+
}
132+
133+
pub(crate) fn execute_send_message_hook_after(
134+
&self,
135+
response: Option<&mut RemotingCommand>,
136+
context: &mut SendMessageContext,
137+
) {
138+
for hook in self.send_message_hook_vec.read().iter() {
139+
if let Some(ref response) = response {
140+
if let Some(ref header) =
141+
response.decode_command_custom_header::<SendMessageResponseHeader>()
142+
{
143+
context.msg_id = header.msg_id().to_string();
144+
context.queue_id = Some(header.queue_id());
145+
context.queue_offset = Some(header.queue_offset());
146+
context.code = response.code();
147+
context.error_msg = response.remark().unwrap_or(&"".to_string()).to_string();
148+
}
149+
}
150+
151+
hook.send_message_after(context);
152+
}
153+
}
154+
126155
pub(crate) fn consumer_send_msg_back(
127156
&self,
128157
_ctx: &ConnectionHandlerContext,
@@ -230,6 +259,7 @@ impl SendMessageProcessorInner {
230259
"Sending message to topic[{}] is forbidden.",
231260
request_header.topic.as_str()
232261
)));
262+
return;
233263
}
234264
let mut topic_config = self
235265
.topic_config_manager

0 commit comments

Comments
 (0)