Skip to content

[ISSUE #966]🚀Support client consumer message-3🚀 #968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions rocketmq-client/examples/quickstart/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
* limitations under the License.
*/
use rocketmq_client::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
use rocketmq_client::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
use rocketmq_client::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
use rocketmq_client::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
use rocketmq_client::consumer::mq_push_consumer::MQPushConsumer;
use rocketmq_client::Result;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_rust::rocketmq;

pub const MESSAGE_COUNT: usize = 1;
Expand All @@ -37,14 +41,24 @@ pub async fn main() -> Result<()> {
.consumer_group(CONSUMER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
consumer.subscribe(TOPIC, "*")?;
consumer.register_message_listener_concurrently(MyMessageListener);
consumer.start().await?;

Ok(())
}

/*consumer.register_message_listener_concurrently(|msgs, _context| {
pub struct MyMessageListener;

impl MessageListenerConcurrently for MyMessageListener {
fn consume_message(
&self,
msgs: Vec<MessageExt>,
_context: ConsumeConcurrentlyContext,
) -> Result<ConsumeConcurrentlyStatus> {
for msg in msgs {
println!("Receive message: {:?}", msg);
}
Ok(())
});*/
consumer.start().await?;

Ok(())
Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
}
Comment on lines +51 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of New Message Listener Implementation

The implementation of MyMessageListener in the example consumer file introduces concurrent message handling capabilities:

  1. Correctness and Logic:

    • The consume_message method processes each message and logs its content. This is a simple and effective demonstration of message processing.
    • The method returns a ConsumeConcurrentlyStatus::ConsumeSuccess which is appropriate for this example.
  2. Best Practices:

    • Using println! for logging in examples is acceptable, but for more robust applications, integrating a logging framework would be advisable.
  3. Error Handling:

    • The method handles the basic case well. However, adding error handling for potential issues during message processing could make the example more comprehensive.

Overall, the example is well-implemented for demonstration purposes but could benefit from more detailed error handling and the use of a logging framework for more serious applications.

}
38 changes: 16 additions & 22 deletions rocketmq-client/src/base/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use rocketmq_common::common::topic::TopicValidator;
use rocketmq_remoting::code::response_code::ResponseCode;

use crate::error::MQClientError::MQClientException;
use crate::error::MQClientError::MQClientErr;
use crate::producer::default_mq_producer::ProducerConfig;
use crate::Result;

Expand All @@ -35,21 +35,18 @@

pub fn check_group(group: &str) -> Result<()> {
if group.trim().is_empty() {
return Err(MQClientException(
-1,
"the specified group is blank".to_string(),
));
return Err(MQClientErr(-1, "the specified group is blank".to_string()));

Check warning on line 38 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L38

Added line #L38 was not covered by tests
}

if group.len() > Self::CHARACTER_MAX_LENGTH {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 42 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L42

Added line #L42 was not covered by tests
-1,
"the specified group is longer than group max length 255.".to_string(),
));
}

if TopicValidator::is_topic_or_group_illegal(group) {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 49 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L49

Added line #L49 was not covered by tests
-1,
format!(
"the specified group[{}] contains illegal characters, allowing only \
Expand All @@ -67,7 +64,7 @@
M: MessageTrait,
{
if msg.is_none() {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 67 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L67

Added line #L67 was not covered by tests
ResponseCode::MessageIllegal as i32,
"the message is null".to_string(),
));
Expand All @@ -77,22 +74,22 @@
Self::is_not_allowed_send_topic(msg.get_topic())?;

if msg.get_body().is_none() {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 77 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L77

Added line #L77 was not covered by tests
ResponseCode::MessageIllegal as i32,
"the message body is null".to_string(),
));
}

let length = msg.get_body().unwrap().len();
if length == 0 {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 85 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L85

Added line #L85 was not covered by tests
ResponseCode::MessageIllegal as i32,
"the message body length is zero".to_string(),
));
}

if length > producer_config.max_message_size() as usize {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 92 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L92

Added line #L92 was not covered by tests
ResponseCode::MessageIllegal as i32,
format!(
"the message body size over max value, MAX: {}",
Expand All @@ -104,7 +101,7 @@
let lmq_path = msg.get_user_property(MessageConst::PROPERTY_INNER_MULTI_DISPATCH);
if let Some(value) = lmq_path {
if value.contains(std::path::MAIN_SEPARATOR) {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 104 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L104

Added line #L104 was not covered by tests
ResponseCode::MessageIllegal as i32,
format!(
"INNER_MULTI_DISPATCH {} can not contains {} character",
Expand All @@ -120,14 +117,11 @@

pub fn check_topic(topic: &str) -> Result<()> {
if topic.trim().is_empty() {
return Err(MQClientException(
-1,
"The specified topic is blank".to_string(),
));
return Err(MQClientErr(-1, "The specified topic is blank".to_string()));

Check warning on line 120 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L120

Added line #L120 was not covered by tests
}

if topic.len() > Self::TOPIC_MAX_LENGTH {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 124 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L124

Added line #L124 was not covered by tests
-1,
format!(
"The specified topic is longer than topic max length {}.",
Expand All @@ -137,7 +131,7 @@
}

if TopicValidator::is_topic_or_group_illegal(topic) {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 134 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L134

Added line #L134 was not covered by tests
-1,
format!(
"The specified topic[{}] contains illegal characters, allowing only \
Expand All @@ -152,7 +146,7 @@

pub fn is_system_topic(topic: &str) -> Result<()> {
if TopicValidator::is_system_topic(topic) {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 149 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L149

Added line #L149 was not covered by tests
-1,
format!("The topic[{}] is conflict with system topic.", topic),
));
Expand All @@ -162,7 +156,7 @@

pub fn is_not_allowed_send_topic(topic: &str) -> Result<()> {
if TopicValidator::is_not_allowed_send_topic(topic) {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 159 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L159

Added line #L159 was not covered by tests
-1,
format!("Sending message to topic[{}] is forbidden.", topic),
));
Expand All @@ -173,7 +167,7 @@

pub fn check_topic_config(topic_config: &TopicConfig) -> Result<()> {
if !PermName::is_valid(topic_config.perm) {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 170 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L170

Added line #L170 was not covered by tests
ResponseCode::NoPermission as i32,
format!("topicPermission value: {} is invalid.", topic_config.perm),
));
Expand All @@ -185,7 +179,7 @@
pub fn check_broker_config(broker_config: &HashMap<String, String>) -> Result<()> {
if let Some(broker_permission) = broker_config.get("brokerPermission") {
if !PermName::is_valid(broker_permission.parse().unwrap()) {
return Err(MQClientException(
return Err(MQClientErr(

Check warning on line 182 in rocketmq-client/src/base/validators.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/validators.rs#L182

Added line #L182 was not covered by tests
-1,
format!("brokerPermission value: {} is invalid.", broker_permission),
));
Expand Down
1 change: 1 addition & 0 deletions rocketmq-client/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ pub mod mq_push_consumer;
mod pull_callback;
mod pull_result;
mod pull_status;
pub mod rebalance_strategy;
mod store;
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ pub trait AllocateMessageQueueStrategy: Send + Sync {
cid_all: &[String],
) -> Result<Vec<MessageQueue>>;

fn get_name(&self) -> &str;
fn get_name(&self) -> &'static str;
}
Loading
Loading