Skip to content

Commit e94c4a6

Browse files
authored
[ISSUE #1611]🚀ClientRemotingProcessor supports RequestCode::ConsumeMessageDirectly(309)🔥 (#1614)
1 parent 4718cd1 commit e94c4a6

9 files changed

+130
-16
lines changed

‎rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
272272

273273
async fn consume_message_directly(
274274
&self,
275-
msg: &MessageExt,
276-
broker_name: &str,
275+
msg: MessageExt,
276+
broker_name: Option<CheetahString>,
277277
) -> ConsumeMessageDirectlyResult {
278278
todo!()
279279
}

‎rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,8 +437,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService {
437437

438438
async fn consume_message_directly(
439439
&self,
440-
msg: &MessageExt,
441-
broker_name: &str,
440+
msg: MessageExt,
441+
broker_name: Option<CheetahString>,
442442
) -> ConsumeMessageDirectlyResult {
443443
todo!()
444444
}

‎rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
8484

8585
async fn consume_message_directly(
8686
&self,
87-
msg: &MessageExt,
88-
broker_name: &str,
87+
msg: MessageExt,
88+
broker_name: Option<CheetahString>,
8989
) -> ConsumeMessageDirectlyResult {
9090
todo!()
9191
}

‎rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
use std::sync::Arc;
1818

19+
use cheetah_string::CheetahString;
1920
use rocketmq_common::common::message::message_client_ext::MessageClientExt;
2021
use rocketmq_common::common::message::message_ext::MessageExt;
2122
use rocketmq_common::common::message::message_queue::MessageQueue;
@@ -54,8 +55,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService {
5455

5556
async fn consume_message_directly(
5657
&self,
57-
msg: &MessageExt,
58-
broker_name: &str,
58+
msg: MessageExt,
59+
broker_name: Option<CheetahString>,
5960
) -> ConsumeMessageDirectlyResult {
6061
todo!()
6162
}

‎rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
use std::sync::Arc;
1818

19+
use cheetah_string::CheetahString;
1920
use rocketmq_common::common::message::message_client_ext::MessageClientExt;
2021
use rocketmq_common::common::message::message_ext::MessageExt;
2122
use rocketmq_common::common::message::message_queue::MessageQueue;
@@ -89,8 +90,8 @@ where
8990

9091
pub async fn consume_message_directly(
9192
&self,
92-
msg: &MessageExt,
93-
broker_name: &str,
93+
msg: MessageExt,
94+
broker_name: Option<CheetahString>,
9495
) -> ConsumeMessageDirectlyResult {
9596
todo!()
9697
}
@@ -206,10 +207,10 @@ where
206207
todo!()
207208
}
208209

209-
async fn consume_message_directly(
210+
pub(crate) async fn consume_message_directly(
210211
&self,
211-
msg: &MessageExt,
212-
broker_name: &str,
212+
msg: MessageExt,
213+
broker_name: Option<CheetahString>,
213214
) -> ConsumeMessageDirectlyResult {
214215
todo!()
215216
}
@@ -249,8 +250,8 @@ pub trait ConsumeMessageServiceTrait {
249250

250251
async fn consume_message_directly(
251252
&self,
252-
msg: &MessageExt,
253-
broker_name: &str,
253+
msg: MessageExt,
254+
broker_name: Option<CheetahString>,
254255
) -> ConsumeMessageDirectlyResult;
255256

256257
async fn submit_consume_request(

‎rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
3838
use rocketmq_common::common::FAQUrl;
3939
use rocketmq_common::MessageAccessor::MessageAccessor;
4040
use rocketmq_common::TimeUtils::get_current_millis;
41+
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
4142
use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo;
4243
use rocketmq_remoting::protocol::filter::filter_api::FilterAPI;
4344
use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
@@ -716,6 +717,29 @@ impl DefaultMQPushConsumerImpl {
716717
.execute_pop_pull_request_later(pop_request, time_delay);
717718
}
718719

720+
pub(crate) async fn consume_message_directly(
721+
&self,
722+
msg: MessageExt,
723+
broker_name: Option<CheetahString>,
724+
) -> Option<ConsumeMessageDirectlyResult> {
725+
if let Some(consume_message_service) = self.consume_message_service.as_ref() {
726+
Some(
727+
consume_message_service
728+
.consume_message_directly(msg, broker_name)
729+
.await,
730+
)
731+
} else if let Some(consume_message_pop_service) = self.consume_message_pop_service.as_ref()
732+
{
733+
Some(
734+
consume_message_pop_service
735+
.consume_message_directly(msg, broker_name)
736+
.await,
737+
)
738+
} else {
739+
None
740+
}
741+
}
742+
719743
pub(crate) async fn pop_message(&mut self, pop_request: PopRequest) {
720744
unimplemented!("popMessage");
721745
}

‎rocketmq-client/src/consumer/mq_consumer_inner.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use std::collections::HashSet;
1919

2020
use cheetah_string::CheetahString;
2121
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
22+
use rocketmq_common::common::message::message_ext::MessageExt;
2223
use rocketmq_common::common::message::message_queue::MessageQueue;
24+
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
2325
use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo;
2426
use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
2527
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
@@ -127,6 +129,21 @@ impl MQConsumerInnerImpl {
127129
}
128130
}
129131
}
132+
133+
pub(crate) async fn consume_message_directly(
134+
&self,
135+
msg: MessageExt,
136+
broker_name: Option<CheetahString>,
137+
) -> Option<ConsumeMessageDirectlyResult> {
138+
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
139+
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
140+
return default_mqpush_consumer_impl
141+
.consume_message_directly(msg, broker_name)
142+
.await;
143+
}
144+
}
145+
None
146+
}
130147
}
131148

132149
impl MQConsumerInner for MQConsumerInnerImpl {

‎rocketmq-client/src/factory/mq_client_instance.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ use rand::seq::SliceRandom;
2727
use rocketmq_common::common::base::service_state::ServiceState;
2828
use rocketmq_common::common::constant::PermName;
2929
use rocketmq_common::common::filter::expression_type::ExpressionType;
30+
use rocketmq_common::common::message::message_ext::MessageExt;
3031
use rocketmq_common::common::message::message_queue::MessageQueue;
3132
use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
3233
use rocketmq_common::common::mix_all;
3334
use rocketmq_common::TimeUtils::get_current_millis;
3435
use rocketmq_remoting::base::connection_net_event::ConnectionNetEvent;
36+
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
3537
use rocketmq_remoting::protocol::heartbeat::consumer_data::ConsumerData;
3638
use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData;
3739
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
@@ -1182,6 +1184,23 @@ impl MQClientInstance {
11821184
Ok(None)
11831185
}
11841186
}
1187+
1188+
pub async fn consume_message_directly(
1189+
&self,
1190+
message: MessageExt,
1191+
consumer_group: &CheetahString,
1192+
broker_name: Option<CheetahString>,
1193+
) -> Option<ConsumeMessageDirectlyResult> {
1194+
let consumer_table = self.consumer_table.read().await;
1195+
let consumer_inner = consumer_table.get(consumer_group);
1196+
if let Some(consumer) = consumer_inner {
1197+
consumer
1198+
.consume_message_directly(message, broker_name)
1199+
.await;
1200+
}
1201+
1202+
None
1203+
}
11851204
}
11861205

11871206
pub fn topic_route_data2topic_publish_info(

‎rocketmq-client/src/implementation/client_remoting_processor.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::net::SocketAddr;
1919
use bytes::Bytes;
2020
use cheetah_string::CheetahString;
2121
use rocketmq_common::common::compression::compressor_factory::CompressorFactory;
22+
use rocketmq_common::common::message::message_decoder;
2223
use rocketmq_common::common::message::message_ext::MessageExt;
2324
use rocketmq_common::common::message::MessageConst;
2425
use rocketmq_common::common::message::MessageTrait;
@@ -30,10 +31,13 @@ use rocketmq_remoting::code::request_code::RequestCode;
3031
use rocketmq_remoting::code::response_code::ResponseCode;
3132
use rocketmq_remoting::net::channel::Channel;
3233
use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader;
34+
use rocketmq_remoting::protocol::header::consume_message_directly_result_request_header::ConsumeMessageDirectlyResultRequestHeader;
3335
use rocketmq_remoting::protocol::header::notify_consumer_ids_changed_request_header::NotifyConsumerIdsChangedRequestHeader;
3436
use rocketmq_remoting::protocol::header::reply_message_request_header::ReplyMessageRequestHeader;
3537
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
3638
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
39+
use rocketmq_remoting::protocol::RemotingSerializable;
40+
use rocketmq_remoting::remoting_error::RemotingError::RemotingCommandError;
3741
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
3842
use rocketmq_remoting::runtime::processor::RequestProcessor;
3943
use rocketmq_remoting::Result;
@@ -79,7 +83,7 @@ impl RequestProcessor for ClientRemotingProcessor {
7983
unimplemented!("GetConsumerRunningInfo")
8084
}
8185
RequestCode::ConsumeMessageDirectly => {
82-
unimplemented!("ConsumeMessageDirectly")
86+
self.consume_message_directly(channel, ctx, request).await
8387
}
8488
//RPC message handle code
8589
RequestCode::PushReplyMessageToClient => self.receive_reply_message(ctx, request).await,
@@ -272,4 +276,52 @@ impl ClientRemotingProcessor {
272276
};
273277
Ok(None)
274278
}
279+
280+
async fn consume_message_directly(
281+
&mut self,
282+
channel: Channel,
283+
ctx: ConnectionHandlerContext,
284+
mut request: RemotingCommand,
285+
) -> Result<Option<RemotingCommand>> {
286+
let request_header =
287+
request.decode_command_custom_header::<ConsumeMessageDirectlyResultRequestHeader>()?;
288+
let body = request
289+
.get_body_mut()
290+
.ok_or(RemotingCommandError("body is empty".to_string()))?;
291+
let msg = message_decoder::decode(body, true, true, false, false, false)
292+
.ok_or(RemotingCommandError("decode message failed".to_string()))?;
293+
294+
if let Some(client_instance) = self.client_instance.upgrade() {
295+
let result = client_instance
296+
.consume_message_directly(
297+
msg,
298+
&request_header.consumer_group,
299+
request_header.broker_name.clone(),
300+
)
301+
.await;
302+
if let Some(result) = result {
303+
let body = result
304+
.encode()
305+
.map_err(|_| RemotingCommandError("encode result failed".to_string()))?;
306+
Ok(Some(
307+
RemotingCommand::create_response_command().set_body(body),
308+
))
309+
} else {
310+
warn!("consumeMessageDirectly, consume message failed");
311+
Ok(Some(
312+
RemotingCommand::create_response_command_with_code(ResponseCode::SystemError)
313+
.set_remark(format!(
314+
"The Consumer Group <{}> not exist in this consumer",
315+
request_header.consumer_group
316+
)),
317+
))
318+
}
319+
} else {
320+
warn!("consumeMessageDirectly, client_instance is empty");
321+
Ok(Some(
322+
RemotingCommand::create_response_command_with_code(ResponseCode::SystemError)
323+
.set_remark("client_instance is empty"),
324+
))
325+
}
326+
}
275327
}

0 commit comments

Comments
 (0)