Skip to content

[ISSUE #1611]🚀ClientRemotingProcessor supports RequestCode::ConsumeMessageDirectly(309)🔥 #1614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {

async fn consume_message_directly(
&self,
msg: &MessageExt,
broker_name: &str,
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> ConsumeMessageDirectlyResult {
Comment on lines +275 to 277
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement consume_message_directly method

The consume_message_directly method currently contains a todo!() placeholder. Please provide an implementation to support direct message consumption functionality.

todo!()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService {

async fn consume_message_directly(
&self,
msg: &MessageExt,
broker_name: &str,
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> ConsumeMessageDirectlyResult {
Comment on lines +440 to 442
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement consume_message_directly method

The consume_message_directly method uses todo!() and lacks an implementation. Implement this method to enable direct message consumption.

todo!()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {

async fn consume_message_directly(
&self,
msg: &MessageExt,
broker_name: &str,
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> ConsumeMessageDirectlyResult {
Comment on lines +87 to 89
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement consume_message_directly method

The consume_message_directly method is currently a stub with todo!(). Implement this method to complete the direct message consumption feature.

todo!()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_client_ext::MessageClientExt;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
Expand Down Expand Up @@ -54,8 +55,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService {

async fn consume_message_directly(
&self,
msg: &MessageExt,
broker_name: &str,
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> ConsumeMessageDirectlyResult {
Comment on lines +58 to 60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement consume_message_directly method

The method consume_message_directly is unimplemented and contains todo!(). Please provide the necessary implementation to support this functionality.

todo!()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_client_ext::MessageClientExt;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
Expand Down Expand Up @@ -89,8 +90,8 @@ where

pub async fn consume_message_directly(
&self,
msg: &MessageExt,
broker_name: &str,
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> ConsumeMessageDirectlyResult {
todo!()
}
Expand Down Expand Up @@ -206,10 +207,10 @@ where
todo!()
}

async fn consume_message_directly(
pub(crate) async fn consume_message_directly(
&self,
msg: &MessageExt,
broker_name: &str,
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> ConsumeMessageDirectlyResult {
todo!()
Comment on lines +210 to 215
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implementation missing for ConsumeMessagePopServiceGeneral

The consume_message_directly method is marked with todo!(). This could cause runtime panics in production. Implementation is required before this code can be safely deployed.

}
Expand Down Expand Up @@ -249,8 +250,8 @@ pub trait ConsumeMessageServiceTrait {

async fn consume_message_directly(
&self,
msg: &MessageExt,
broker_name: &str,
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> ConsumeMessageDirectlyResult;

async fn submit_consume_request(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
use rocketmq_common::common::FAQUrl;
use rocketmq_common::MessageAccessor::MessageAccessor;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo;
use rocketmq_remoting::protocol::filter::filter_api::FilterAPI;
use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
Expand Down Expand Up @@ -716,6 +717,29 @@ impl DefaultMQPushConsumerImpl {
.execute_pop_pull_request_later(pop_request, time_delay);
}

pub(crate) async fn consume_message_directly(
&self,
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> Option<ConsumeMessageDirectlyResult> {
if let Some(consume_message_service) = self.consume_message_service.as_ref() {
Some(
consume_message_service
.consume_message_directly(msg, broker_name)
.await,
)
} else if let Some(consume_message_pop_service) = self.consume_message_pop_service.as_ref()
{
Some(
consume_message_pop_service
.consume_message_directly(msg, broker_name)
.await,
)
} else {
None
}
}

pub(crate) async fn pop_message(&mut self, pop_request: PopRequest) {
unimplemented!("popMessage");
}
Expand Down
17 changes: 17 additions & 0 deletions rocketmq-client/src/consumer/mq_consumer_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use std::collections::HashSet;

use cheetah_string::CheetahString;
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo;
use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
Expand Down Expand Up @@ -127,6 +129,21 @@ impl MQConsumerInnerImpl {
}
}
}

pub(crate) async fn consume_message_directly(
&self,
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> Option<ConsumeMessageDirectlyResult> {
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
return default_mqpush_consumer_impl
.consume_message_directly(msg, broker_name)
.await;
}
}
None
}
}

impl MQConsumerInner for MQConsumerInnerImpl {
Expand Down
19 changes: 19 additions & 0 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ use rand::seq::SliceRandom;
use rocketmq_common::common::base::service_state::ServiceState;
use rocketmq_common::common::constant::PermName;
use rocketmq_common::common::filter::expression_type::ExpressionType;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
use rocketmq_common::common::mix_all;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::base::connection_net_event::ConnectionNetEvent;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
use rocketmq_remoting::protocol::heartbeat::consumer_data::ConsumerData;
use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
Expand Down Expand Up @@ -1182,6 +1184,23 @@ impl MQClientInstance {
Ok(None)
}
}

pub async fn consume_message_directly(
&self,
message: MessageExt,
consumer_group: &CheetahString,
broker_name: Option<CheetahString>,
) -> Option<ConsumeMessageDirectlyResult> {
let consumer_table = self.consumer_table.read().await;
let consumer_inner = consumer_table.get(consumer_group);
if let Some(consumer) = consumer_inner {
consumer
.consume_message_directly(message, broker_name)
.await;
}

None
}
Comment on lines +1188 to +1203
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix critical bug in return value handling

The implementation ignores the result of consume_message_directly and always returns None. This could mask successful message consumption results.

Apply this fix:

     pub async fn consume_message_directly(
         &self,
         message: MessageExt,
         consumer_group: &CheetahString,
         broker_name: Option<CheetahString>,
     ) -> Option<ConsumeMessageDirectlyResult> {
         let consumer_table = self.consumer_table.read().await;
         let consumer_inner = consumer_table.get(consumer_group);
         if let Some(consumer) = consumer_inner {
-            consumer
+            return consumer
                 .consume_message_directly(message, broker_name)
-                .await;
+                .await
         }
 
         None
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn consume_message_directly(
&self,
message: MessageExt,
consumer_group: &CheetahString,
broker_name: Option<CheetahString>,
) -> Option<ConsumeMessageDirectlyResult> {
let consumer_table = self.consumer_table.read().await;
let consumer_inner = consumer_table.get(consumer_group);
if let Some(consumer) = consumer_inner {
consumer
.consume_message_directly(message, broker_name)
.await;
}
None
}
pub async fn consume_message_directly(
&self,
message: MessageExt,
consumer_group: &CheetahString,
broker_name: Option<CheetahString>,
) -> Option<ConsumeMessageDirectlyResult> {
let consumer_table = self.consumer_table.read().await;
let consumer_inner = consumer_table.get(consumer_group);
if let Some(consumer) = consumer_inner {
return consumer
.consume_message_directly(message, broker_name)
.await
}
None
}

}

pub fn topic_route_data2topic_publish_info(
Expand Down
54 changes: 53 additions & 1 deletion rocketmq-client/src/implementation/client_remoting_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::net::SocketAddr;
use bytes::Bytes;
use cheetah_string::CheetahString;
use rocketmq_common::common::compression::compressor_factory::CompressorFactory;
use rocketmq_common::common::message::message_decoder;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
Expand All @@ -30,10 +31,13 @@ use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader;
use rocketmq_remoting::protocol::header::consume_message_directly_result_request_header::ConsumeMessageDirectlyResultRequestHeader;
use rocketmq_remoting::protocol::header::notify_consumer_ids_changed_request_header::NotifyConsumerIdsChangedRequestHeader;
use rocketmq_remoting::protocol::header::reply_message_request_header::ReplyMessageRequestHeader;
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_remoting::remoting_error::RemotingError::RemotingCommandError;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_remoting::runtime::processor::RequestProcessor;
use rocketmq_remoting::Result;
Expand Down Expand Up @@ -79,7 +83,7 @@ impl RequestProcessor for ClientRemotingProcessor {
unimplemented!("GetConsumerRunningInfo")
}
RequestCode::ConsumeMessageDirectly => {
unimplemented!("ConsumeMessageDirectly")
self.consume_message_directly(channel, ctx, request).await
}
//RPC message handle code
RequestCode::PushReplyMessageToClient => self.receive_reply_message(ctx, request).await,
Expand Down Expand Up @@ -272,4 +276,52 @@ impl ClientRemotingProcessor {
};
Ok(None)
}

async fn consume_message_directly(
&mut self,
channel: Channel,
ctx: ConnectionHandlerContext,
mut request: RemotingCommand,
) -> Result<Option<RemotingCommand>> {
let request_header =
request.decode_command_custom_header::<ConsumeMessageDirectlyResultRequestHeader>()?;
let body = request
.get_body_mut()
.ok_or(RemotingCommandError("body is empty".to_string()))?;
let msg = message_decoder::decode(body, true, true, false, false, false)
.ok_or(RemotingCommandError("decode message failed".to_string()))?;

if let Some(client_instance) = self.client_instance.upgrade() {
let result = client_instance
.consume_message_directly(
msg,
&request_header.consumer_group,
request_header.broker_name.clone(),
)
.await;
if let Some(result) = result {
let body = result
.encode()
.map_err(|_| RemotingCommandError("encode result failed".to_string()))?;
Ok(Some(
RemotingCommand::create_response_command().set_body(body),
))
} else {
warn!("consumeMessageDirectly, consume message failed");
Ok(Some(
RemotingCommand::create_response_command_with_code(ResponseCode::SystemError)
.set_remark(format!(
"The Consumer Group <{}> not exist in this consumer",
request_header.consumer_group
)),
))
}
} else {
warn!("consumeMessageDirectly, client_instance is empty");
Ok(Some(
RemotingCommand::create_response_command_with_code(ResponseCode::SystemError)
.set_remark("client_instance is empty"),
))
}
}
}
Loading