From 1a744762e93fb4b6e0a78fe852a7bd3b7e6a84b0 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 1 Dec 2024 17:53:51 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1482]=E2=99=BB=EF=B8=8FRefactor=20cre?= =?UTF-8?q?ate=20MQClientErr=20replace=20with=20mq=5Fclient=5Ferr!=20macro?= =?UTF-8?q?=F0=9F=94=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-client/src/base/validators.rs | 72 ++++++------ .../default_mq_push_consumer_impl.rs | 94 ++++++++-------- .../consumer_impl/pull_api_wrapper.rs | 20 ++-- .../re_balance/rebalance_push_impl.rs | 39 +++---- .../consumer/store/local_file_offset_store.rs | 12 +- .../store/remote_broker_offset_store.rs | 12 +- .../src/factory/mq_client_instance.rs | 8 +- .../src/implementation/mq_admin_impl.rs | 7 +- .../src/implementation/mq_client_api_impl.rs | 33 +++--- .../src/producer/default_mq_producer.rs | 7 +- .../producer_impl/default_mq_producer_impl.rs | 105 ++++++------------ rocketmq-client/src/utils/message_util.rs | 9 +- rocketmq/src/shutdown.rs | 1 - 13 files changed, 173 insertions(+), 246 deletions(-) diff --git a/rocketmq-client/src/base/validators.rs b/rocketmq-client/src/base/validators.rs index 16668d8f..443f292c 100644 --- a/rocketmq-client/src/base/validators.rs +++ b/rocketmq-client/src/base/validators.rs @@ -24,8 +24,7 @@ use rocketmq_common::common::message::MessageTrait; use rocketmq_common::common::topic::TopicValidator; use rocketmq_remoting::code::response_code::ResponseCode; -use crate::client_error::ClientErr; -use crate::client_error::MQClientError::MQClientErr; +use crate::mq_client_err; use crate::producer::default_mq_producer::ProducerConfig; use crate::Result; @@ -37,21 +36,19 @@ impl Validators { pub fn check_group(group: &str) -> Result<()> { if group.trim().is_empty() { - return Err(MQClientErr(ClientErr::new("the specified group is blank"))); + return mq_client_err!("the specified group is blank"); } if group.len() > Self::CHARACTER_MAX_LENGTH { - return Err(MQClientErr(ClientErr::new( - "the specified group is longer than group max length 255.", - ))); + return mq_client_err!("the specified group is longer than group max length 255."); } if TopicValidator::is_topic_or_group_illegal(group) { - return Err(MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "the specified group[{}] contains illegal characters, allowing only \ ^[%|a-zA-Z0-9_-]+$", group - )))); + )); } Ok(()) } @@ -61,38 +58,38 @@ impl Validators { M: MessageTrait, { if msg.is_none() { - return Err(MQClientErr(ClientErr::new_with_code( + return mq_client_err!( ResponseCode::MessageIllegal as i32, - "the message is null".to_string(), - ))); + "the message is null".to_string() + ); } let msg = msg.unwrap(); Self::check_topic(msg.get_topic())?; Self::is_not_allowed_send_topic(msg.get_topic())?; if msg.get_body().is_none() { - return Err(MQClientErr(ClientErr::new_with_code( + return mq_client_err!( ResponseCode::MessageIllegal as i32, - "the message body is null".to_string(), - ))); + "the message body is null".to_string() + ); } let length = msg.get_body().unwrap().len(); if length == 0 { - return Err(MQClientErr(ClientErr::new_with_code( + return mq_client_err!( ResponseCode::MessageIllegal as i32, - "the message body length is zero".to_string(), - ))); + "the message body length is zero".to_string() + ); } if length > producer_config.max_message_size() as usize { - return Err(MQClientErr(ClientErr::new_with_code( + return mq_client_err!( ResponseCode::MessageIllegal as i32, format!( "the message body size over max value, MAX: {}", producer_config.max_message_size() - ), - ))); + ) + ); } let lmq_path = msg.get_user_property(&CheetahString::from_static_str( @@ -100,14 +97,14 @@ impl Validators { )); if let Some(value) = lmq_path { if value.contains(std::path::MAIN_SEPARATOR) { - return Err(MQClientErr(ClientErr::new_with_code( + return mq_client_err!( ResponseCode::MessageIllegal as i32, format!( "INNER_MULTI_DISPATCH {} can not contains {} character", value, std::path::MAIN_SEPARATOR - ), - ))); + ) + ); } } @@ -116,22 +113,22 @@ impl Validators { pub fn check_topic(topic: &str) -> Result<()> { if topic.trim().is_empty() { - return Err(MQClientErr(ClientErr::new("The specified topic is blank"))); + return mq_client_err!("The specified topic is blank"); } if topic.len() > Self::TOPIC_MAX_LENGTH { - return Err(MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The specified topic is longer than topic max length {}.", Self::TOPIC_MAX_LENGTH - )))); + )); } if TopicValidator::is_topic_or_group_illegal(topic) { - return Err(MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The specified topic[{}] contains illegal characters, allowing only \ ^[%|a-zA-Z0-9_-]+$", topic - )))); + )); } Ok(()) @@ -139,20 +136,17 @@ impl Validators { pub fn is_system_topic(topic: &str) -> Result<()> { if TopicValidator::is_system_topic(topic) { - return Err(MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The topic[{}] is conflict with system topic.", topic - )))); + )); } Ok(()) } pub fn is_not_allowed_send_topic(topic: &str) -> Result<()> { if TopicValidator::is_not_allowed_send_topic(topic) { - return Err(MQClientErr(ClientErr::new(format!( - "Sending message to topic[{}] is forbidden.", - topic - )))); + return mq_client_err!(format!("Sending message to topic[{}] is forbidden.", topic)); } Ok(()) @@ -160,10 +154,10 @@ impl Validators { pub fn check_topic_config(topic_config: &TopicConfig) -> Result<()> { if !PermName::is_valid(topic_config.perm) { - return Err(MQClientErr(ClientErr::new_with_code( + return mq_client_err!( ResponseCode::NoPermission as i32, - format!("topicPermission value: {} is invalid.", topic_config.perm), - ))); + format!("topicPermission value: {} is invalid.", topic_config.perm) + ); } Ok(()) @@ -172,10 +166,10 @@ impl Validators { pub fn check_broker_config(broker_config: &HashMap) -> Result<()> { if let Some(broker_permission) = broker_config.get("brokerPermission") { if !PermName::is_valid(broker_permission.parse().unwrap()) { - return Err(MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "brokerPermission value: {} is invalid.", broker_permission - )))); + )); } } diff --git a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs index 3f3802a6..b766cd18 100644 --- a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs @@ -83,6 +83,7 @@ use crate::hook::consume_message_hook::ConsumeMessageHook; use crate::hook::filter_message_hook::FilterMessageHook; use crate::implementation::communication_mode::CommunicationMode; use crate::implementation::mq_client_manager::MQClientManager; +use crate::mq_client_err; use crate::producer::mq_producer::MQProducer; use crate::Result; @@ -354,21 +355,17 @@ impl DefaultMQPushConsumerImpl { *self.service_state = ServiceState::Running; } ServiceState::Running => { - return Err(MQClientError::MQClientErr(ClientErr::new( - "The PushConsumer service state is Running", - ))); + return mq_client_err!("The PushConsumer service state is Running"); } ServiceState::ShutdownAlready => { - return Err(MQClientError::MQClientErr(ClientErr::new( - "The PushConsumer service state is ShutdownAlready", - ))); + return mq_client_err!("The PushConsumer service state is ShutdownAlready"); } ServiceState::StartFailed => { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The PushConsumer service state not OK, maybe started once,{:?},{}", *self.service_state, FAQUrl::suggest_todo(FAQUrl::CLIENT_SERVICE_NOT_OK) - )))); + )); } } self.update_topic_subscribe_info_when_subscription_changed() @@ -448,18 +445,18 @@ impl DefaultMQPushConsumerImpl { fn check_config(&mut self) -> Result<()> { Validators::check_group(self.consumer_config.consumer_group.as_str())?; if self.consumer_config.consumer_group.is_empty() { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "consumer_group is empty, {}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.consumer_group == DEFAULT_CONSUMER_GROUP { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "consumer_group can not equal {} please specify another one.{}", DEFAULT_CONSUMER_GROUP, FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self @@ -467,17 +464,17 @@ impl DefaultMQPushConsumerImpl { .allocate_message_queue_strategy .is_none() { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "allocate_message_queue_strategy is null{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.message_listener.is_none() { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "messageListener is null{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self @@ -495,120 +492,120 @@ impl DefaultMQPushConsumerImpl { .message_listener_concurrently .is_none() { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "messageListener must be instanceof MessageListenerOrderly or \ MessageListenerConcurrently{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } let consume_thread_min = self.consumer_config.consume_thread_min; let consume_thread_max = self.consumer_config.consume_thread_max; if !(1..=1000).contains(&consume_thread_min) { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "consumeThreadMin Out of range [1, 1000]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if !(1..=1000).contains(&consume_thread_max) { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "consumeThreadMax Out of range [1, 1000]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if consume_thread_min > consume_thread_max { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "consumeThreadMin ({}) is larger than consumeThreadMax ({})", consume_thread_min, consume_thread_max - )))); + )); } if self.consumer_config.consume_concurrently_max_span < 1 || self.consumer_config.consume_concurrently_max_span > 65535 { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "consumeConcurrentlyMaxSpan Out of range [1, 65535]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.pull_threshold_for_queue < 1 || self.consumer_config.pull_threshold_for_queue > 65535 { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "pullThresholdForQueue Out of range [1, 65535]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.pull_threshold_for_topic != -1 && (self.consumer_config.pull_threshold_for_topic < 1 || self.consumer_config.pull_threshold_for_topic > 6553500) { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "pullThresholdForTopic Out of range [1, 65535]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.pull_threshold_size_for_queue < 1 || self.consumer_config.pull_threshold_size_for_queue > 1024 { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "pullThresholdSizeForQueue Out of range [1, 1024]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.pull_threshold_size_for_topic != -1 && (self.consumer_config.pull_threshold_size_for_topic < 1 || self.consumer_config.pull_threshold_size_for_topic > 102400) { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "pullThresholdSizeForTopic Out of range [1, 102400]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.pull_interval > 65535 { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "pullInterval Out of range [0, 65535]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.consume_message_batch_max_size < 1 || self.consumer_config.consume_message_batch_max_size > 1024 { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "consumeMessageBatchMaxSize Out of range [1, 1024]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.pull_batch_size < 1 || self.consumer_config.pull_batch_size > 1024 { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "pullBatchSize Out of range [1, 1024]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.pop_invisible_time < MIN_POP_INVISIBLE_TIME || self.consumer_config.pop_invisible_time > MAX_POP_INVISIBLE_TIME { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "popInvisibleTime Out of range [{}, {}]{}", MIN_POP_INVISIBLE_TIME, MAX_POP_INVISIBLE_TIME, FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } if self.consumer_config.pop_batch_nums < 1 || self.consumer_config.pop_batch_nums > 32 { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "popBatchNums Out of range [1, 32]{}", FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - )))); + )); } Ok(()) @@ -673,10 +670,7 @@ impl DefaultMQPushConsumerImpl { ) -> Result<()> { let subscription_data = FilterAPI::build_subscription_data(&topic, &sub_expression); if let Err(e) = subscription_data { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( - "buildSubscriptionData exception, {}", - e - )))); + return mq_client_err!(format!("buildSubscriptionData exception, {}", e)); } let subscription_data = subscription_data.unwrap(); self.rebalance_impl @@ -974,11 +968,11 @@ impl DefaultMQPushConsumerImpl { fn make_sure_state_ok(&self) -> Result<()> { if *self.service_state != ServiceState::Running { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The consumer service state not OK, {},{}", *self.service_state, FAQUrl::suggest_todo(FAQUrl::CLIENT_SERVICE_NOT_OK) - )))); + )); } Ok(()) } diff --git a/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs b/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs index ae9f2e81..ef631516 100644 --- a/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs +++ b/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs @@ -36,8 +36,6 @@ use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; use rocketmq_remoting::rpc::rpc_request_header::RpcRequestHeader; use rocketmq_rust::ArcMut; -use crate::client_error::ClientErr; -use crate::client_error::MQClientError::MQClientErr; use crate::consumer::consumer_impl::pull_request_ext::PullResultExt; use crate::consumer::pull_callback::PullCallback; use crate::consumer::pull_status::PullStatus; @@ -46,6 +44,7 @@ use crate::hook::filter_message_context::FilterMessageContext; use crate::hook::filter_message_hook::FilterMessageHook; use crate::implementation::communication_mode::CommunicationMode; use crate::implementation::mq_client_api_impl::MQClientAPIImpl; +use crate::mq_client_err; use crate::Result; #[derive(Clone)] @@ -300,13 +299,13 @@ impl PullAPIWrapper { if !ExpressionType::is_tag_type(Some(expression_type.as_str())) && find_broker_result.broker_version < RocketMqVersion::V410Snapshot.into() { - return Err(MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The broker[{}],[{}] does not support consumer to filter message by \ tag[{}]", mq.get_broker_name(), find_broker_result.broker_version, expression_type - )))); + )); } } @@ -361,10 +360,7 @@ impl PullAPIWrapper { ) .await } else { - Err(MQClientErr(ClientErr::new(format!( - "The broker[{}] not exist", - mq.get_broker_name(), - )))) + mq_client_err!(format!("The broker[{}] not exist", mq.get_broker_name(),)) } } @@ -381,17 +377,17 @@ impl PullAPIWrapper { .get(broker_addr); if let Some(vec) = vec { return vec.get(random_num() as usize % vec.len()).map_or( - Err(MQClientErr(ClientErr::new(format!( + mq_client_err!(format!( "Find Filter Server Failed, Broker Addr: {},topic:{}", broker_addr, topic - )))), + )), |v| Ok(v.clone()), ); } - Err(MQClientErr(ClientErr::new(format!( + mq_client_err!(format!( "Find Filter Server Failed, Broker Addr: {},topic:{}", broker_addr, topic - )))) + )) } } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs index af23266c..71687985 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs @@ -39,8 +39,6 @@ use tracing::info; use tracing::warn; use crate::base::client_config::ClientConfig; -use crate::client_error::ClientErr; -use crate::client_error::MQClientError; use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy; use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl; use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue; @@ -52,6 +50,7 @@ use crate::consumer::consumer_impl::re_balance::Rebalance; use crate::consumer::default_mq_push_consumer::ConsumerConfig; use crate::consumer::store::read_offset_type::ReadOffsetType; use crate::factory::mq_client_instance::MQClientInstance; +use crate::mq_client_err; use crate::Result; static UNLOCK_DELAY_TIME_MILLS: Lazy = Lazy::new(|| { @@ -273,9 +272,7 @@ impl Rebalance for RebalancePushImpl { .unwrap() .upgrade(); if default_mqpush_consumer_impl.is_none() { - return Err(MQClientError::MQClientErr(ClientErr::new( - "default_mqpush_consumer_impl is none", - ))); + return mq_client_err!("default_mqpush_consumer_impl is none"); } let mut default_mqpush_consumer_impl = default_mqpush_consumer_impl.unwrap(); let offset_store = default_mqpush_consumer_impl.offset_store.as_mut().unwrap(); @@ -306,10 +303,10 @@ impl Rebalance for RebalancePushImpl { .await? } } else { - return Err(MQClientError::MQClientErr(ClientErr::new_with_code( - ResponseCode::QueryNotFound.into(), - "Failed to query consume offset from offset store", - ))); + return mq_client_err!( + ResponseCode::QueryNotFound as i32, + "Failed to query consume offset from offset store" + ); } } ConsumeFromWhere::ConsumeFromFirstOffset => { @@ -321,10 +318,10 @@ impl Rebalance for RebalancePushImpl { } else if -1 == last_offset { 0 } else { - return Err(MQClientError::MQClientErr(ClientErr::new_with_code( - ResponseCode::QueryNotFound.into(), - "Failed to query consume offset from offset store", - ))); + return mq_client_err!( + ResponseCode::QueryNotFound as i32, + "Failed to query consume offset from offset store" + ); } } ConsumeFromWhere::ConsumeFromTimestamp => { @@ -362,18 +359,18 @@ impl Rebalance for RebalancePushImpl { .await? } } else { - return Err(MQClientError::MQClientErr(ClientErr::new_with_code( - ResponseCode::QueryNotFound.into(), - "Failed to query consume offset from offset store", - ))); + return mq_client_err!( + ResponseCode::QueryNotFound as i32, + "Failed to query consume offset from offset store" + ); } } }; if result < 0 { - return Err(MQClientError::MQClientErr(ClientErr::new_with_code( - ResponseCode::SystemError.into(), - "Failed to query consume offset from offset store", - ))); + return mq_client_err!( + ResponseCode::SystemError as i32, + "Failed to query consume offset from offset store" + ); } Ok(result) } diff --git a/rocketmq-client/src/consumer/store/local_file_offset_store.rs b/rocketmq-client/src/consumer/store/local_file_offset_store.rs index 4e9e678d..782c57cb 100644 --- a/rocketmq-client/src/consumer/store/local_file_offset_store.rs +++ b/rocketmq-client/src/consumer/store/local_file_offset_store.rs @@ -32,14 +32,13 @@ use tokio::sync::Mutex; use tracing::error; use tracing::info; -use crate::client_error::ClientErr; -use crate::client_error::MQClientError; use crate::consumer::store::controllable_offset::ControllableOffset; use crate::consumer::store::offset_serialize::OffsetSerialize; use crate::consumer::store::offset_serialize_wrapper::OffsetSerializeWrapper; use crate::consumer::store::offset_store::OffsetStoreTrait; use crate::consumer::store::read_offset_type::ReadOffsetType; use crate::factory::mq_client_instance::MQClientInstance; +use crate::mq_client_err; use crate::Result; static LOCAL_OFFSET_STORE_DIR: Lazy = Lazy::new(|| { @@ -89,10 +88,7 @@ impl LocalFileOffsetStore { } else { match OffsetSerialize::decode(content.as_bytes()) { Ok(value) => Ok(Some(value.into())), - Err(e) => Err(MQClientError::MQClientErr(ClientErr::new(format!( - "Failed to deserialize local offset: {}", - e - )))), + Err(e) => mq_client_err!(format!("Failed to deserialize local offset: {}", e)), } } } @@ -104,10 +100,10 @@ impl LocalFileOffsetStore { } else { match OffsetSerialize::decode(content.as_bytes()) { Ok(value) => Ok(Some(value.into())), - Err(_) => Err(MQClientError::MQClientErr(ClientErr::new(format!( + Err(_) => mq_client_err!(format!( "read local offset bak failed, content: {}", content - )))), + )), } } } diff --git a/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs b/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs index 7bdadcdc..47b32b2e 100644 --- a/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs +++ b/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs @@ -31,12 +31,12 @@ use tracing::error; use tracing::info; use tracing::warn; -use crate::client_error::ClientErr; use crate::client_error::MQClientError; use crate::consumer::store::controllable_offset::ControllableOffset; use crate::consumer::store::offset_store::OffsetStoreTrait; use crate::consumer::store::read_offset_type::ReadOffsetType; use crate::factory::mq_client_instance::MQClientInstance; +use crate::mq_client_err; use crate::Result; pub struct RemoteBrokerOffsetStore { @@ -110,10 +110,7 @@ impl RemoteBrokerOffsetStore { ) .await } else { - Err(MQClientError::MQClientErr(ClientErr::new(format!( - "broker not found, {}", - mq.get_broker_name() - )))) + mq_client_err!(format!("broker not found, {}", mq.get_broker_name())) } } } @@ -320,10 +317,7 @@ impl OffsetStoreTrait for RemoteBrokerOffsetStore { }; Ok(()) } else { - Err(MQClientError::MQClientErr(ClientErr::new(format!( - "broker not found, {}", - mq.get_broker_name() - )))) + mq_client_err!(format!("broker not found, {}", mq.get_broker_name())) } } } diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index bb6fd280..66c07b06 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -51,7 +51,6 @@ use tracing::warn; use crate::admin::mq_admin_ext_inner::MQAdminExtInner; use crate::base::client_config::ClientConfig; -use crate::client_error::ClientErr; use crate::client_error::MQClientError::MQClientErr; use crate::consumer::consumer_impl::pull_message_service::PullMessageService; use crate::consumer::consumer_impl::re_balance::rebalance_service::RebalanceService; @@ -61,6 +60,7 @@ use crate::implementation::client_remoting_processor::ClientRemotingProcessor; use crate::implementation::find_broker_result::FindBrokerResult; use crate::implementation::mq_admin_impl::MQAdminImpl; use crate::implementation::mq_client_api_impl::MQClientAPIImpl; +use crate::mq_client_err; use crate::producer::default_mq_producer::DefaultMQProducer; use crate::producer::default_mq_producer::ProducerConfig; use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl; @@ -330,10 +330,10 @@ impl MQClientInstance { ServiceState::Running => {} ServiceState::ShutdownAlready => {} ServiceState::StartFailed => { - return Err(MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The Factory object[{}] has been created before, and failed.", self.client_id - )))); + )); } } Ok(()) @@ -986,7 +986,7 @@ impl MQClientInstance { the log!", subscription_data.expression_type ); - return Err(MQClientErr(ClientErr::new(desc))); + return mq_client_err!(desc); } }, } diff --git a/rocketmq-client/src/implementation/mq_admin_impl.rs b/rocketmq-client/src/implementation/mq_admin_impl.rs index 0e57257a..ee40d3d8 100644 --- a/rocketmq-client/src/implementation/mq_admin_impl.rs +++ b/rocketmq-client/src/implementation/mq_admin_impl.rs @@ -19,11 +19,10 @@ use rocketmq_remoting::protocol::namespace_util::NamespaceUtil; use rocketmq_rust::ArcMut; use crate::base::client_config::ClientConfig; -use crate::client_error::ClientErr; -use crate::client_error::MQClientError::MQClientErr; use crate::factory::mq_client_instance; use crate::factory::mq_client_instance::MQClientInstance; use crate::implementation::mq_client_api_impl::MQClientAPIImpl; +use crate::mq_client_err; use crate::Result; pub struct MQAdminImpl { @@ -88,10 +87,10 @@ impl MQAdminImpl { )); } } - Err(MQClientErr(ClientErr::new(format!( + mq_client_err!(format!( "Unknow why, Can not find Message Queue for this topic, {}", topic - )))) + )) } pub async fn max_offset(&mut self, mq: &MessageQueue) -> Result { diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index cdeb7423..0ae51a9a 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -84,11 +84,9 @@ use tracing::error; use tracing::warn; use crate::base::client_config::ClientConfig; -use crate::client_error::ClientErr; use crate::client_error::MQBrokerErr; use crate::client_error::MQClientError; use crate::client_error::MQClientError::MQClientBrokerError; -use crate::client_error::MQClientError::MQClientErr; use crate::consumer::consumer_impl::pull_request_ext::PullResultExt; use crate::consumer::pull_callback::PullCallback; use crate::consumer::pull_result::PullResult; @@ -97,6 +95,7 @@ use crate::factory::mq_client_instance::MQClientInstance; use crate::hook::send_message_context::SendMessageContext; use crate::implementation::client_remoting_processor::ClientRemotingProcessor; use crate::implementation::communication_mode::CommunicationMode; +use crate::mq_client_err; use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl; use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo; use crate::producer::send_callback::SendMessageCallback; @@ -254,16 +253,16 @@ impl MQClientAPIImpl { } } _ => { - return Err(MQClientError::MQClientErr(ClientErr::new_with_code( + return mq_client_err!( code, - result.remark().cloned().unwrap_or_default().to_string(), - ))) + result.remark().cloned().unwrap_or_default().to_string() + ) } } - Err(MQClientError::MQClientErr(ClientErr::new_with_code( + mq_client_err!( code, - result.remark().cloned().unwrap_or_default().to_string(), - ))) + result.remark().cloned().unwrap_or_default().to_string() + ) } Err(err) => Err(MQClientError::RemotingError(err)), } @@ -723,10 +722,10 @@ impl MQClientAPIImpl { ) .await?; if ResponseCode::from(response.code()) != ResponseCode::Success { - return Err(MQClientError::MQClientErr(ClientErr::new_with_code( + return mq_client_err!( response.code(), - response.remark().map_or("".to_string(), |s| s.to_string()), - ))); + response.remark().map_or("".to_string(), |s| s.to_string()) + ); } Ok(()) } @@ -762,9 +761,9 @@ impl MQClientAPIImpl { if let Some(body) = response.body() { return match GetConsumerListByGroupResponseBody::decode(body) { Ok(value) => Ok(value.consumer_id_list), - Err(e) => Err(MQClientError::MQClientErr(ClientErr::new( - response.remark().map_or("".to_string(), |s| s.to_string()), - ))), + Err(e) => mq_client_err!(response + .remark() + .map_or("".to_string(), |s| s.to_string())), }; } } @@ -1261,10 +1260,10 @@ impl MQClientAPIImpl { ) .await?; if ResponseCode::from(response.code()) != ResponseCode::Success { - return Err(MQClientErr(ClientErr::new_with_code( + return mq_client_err!( response.code(), - response.remark().cloned().unwrap_or_default().to_string(), - ))); + response.remark().cloned().unwrap_or_default().to_string() + ); } Ok(()) } diff --git a/rocketmq-client/src/producer/default_mq_producer.rs b/rocketmq-client/src/producer/default_mq_producer.rs index 6b878987..db14154a 100644 --- a/rocketmq-client/src/producer/default_mq_producer.rs +++ b/rocketmq-client/src/producer/default_mq_producer.rs @@ -39,8 +39,7 @@ use tracing::error; use crate::base::client_config::ClientConfig; use crate::base::validators::Validators; -use crate::client_error::ClientErr; -use crate::client_error::MQClientError::MQClientErr; +use crate::mq_client_err; use crate::producer::default_mq_produce_builder::DefaultMQProducerBuilder; use crate::producer::mq_producer::MQProducer; use crate::producer::produce_accumulator::ProduceAccumulator; @@ -491,9 +490,7 @@ impl DefaultMQProducer { } Err(err) => { error!("Failed to initiate the MessageBatch: {:?}", err); - Err(MQClientErr(ClientErr::new( - "Failed to initiate the MessageBatch", - ))) + mq_client_err!("Failed to initiate the MessageBatch") } } } diff --git a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs index fc817f4d..9bcefa82 100644 --- a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs +++ b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs @@ -61,7 +61,6 @@ use crate::base::client_config::ClientConfig; use crate::base::validators::Validators; use crate::client_error::ClientErr; use crate::client_error::MQClientError; -use crate::client_error::MQClientError::MQClientErr; use crate::client_error::MQClientError::RemotingTooMuchRequestError; use crate::client_error::MQClientError::RequestTimeoutError; use crate::client_error::RequestTimeoutErr; @@ -78,6 +77,7 @@ use crate::implementation::mq_client_manager::MQClientManager; use crate::latency::mq_fault_strategy::MQFaultStrategy; use crate::latency::resolver::Resolver; use crate::latency::service_detector::ServiceDetector; +use crate::mq_client_err; use crate::producer::default_mq_producer::ProducerConfig; use crate::producer::local_transaction_state::LocalTransactionState; use crate::producer::message_queue_selector::MessageQueueSelectorFn; @@ -259,11 +259,11 @@ impl DefaultMQProducerImpl { Validators::check_message(Some(&msg), self.producer_config.as_ref())?; if msg.get_topic() != mq.get_topic() { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "message topic [{}] is not equal with message queue topic [{}]", msg.get_topic(), mq.get_topic() - )))); + )); } let cost_time = begin_start_time.elapsed().as_millis() as u64; if timeout < cost_time { @@ -426,16 +426,11 @@ impl DefaultMQProducerImpl { ) .await; } - return Err(MQClientError::MQClientErr(ClientErr::new( - "select message queue return null.", - ))); + return mq_client_err!("select message queue return null."); } } self.validate_name_server_setting()?; - Err(MQClientError::MQClientErr(ClientErr::new(format!( - "No route info for this topic, {}", - msg.get_topic() - )))) + mq_client_err!(format!("No route info for this topic, {}", msg.get_topic())) } #[inline] @@ -876,59 +871,41 @@ impl DefaultMQProducerImpl { return if let Some(err) = exception { match err { MQClientError::MQClientErr(_) => { - Err(MQClientErr(ClientErr::new_with_code( - ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, - info, - ))) + mq_client_err!(ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, info) } RemotingTooMuchRequestError(_) => { - Err(MQClientErr(ClientErr::new_with_code( - ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, - info, - ))) + mq_client_err!(ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, info) } MQClientError::MQClientBrokerError(_) => { - Err(MQClientErr(ClientErr::new_with_code( - ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, - info, - ))) + mq_client_err!(ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, info) } MQClientError::RequestTimeoutError(_) => { - Err(MQClientErr(ClientErr::new_with_code( - ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, - info, - ))) + mq_client_err!(ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, info) } MQClientError::OffsetNotFoundError(_, _, _) => { - Err(MQClientErr(ClientErr::new_with_code( - ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, - info, - ))) + mq_client_err!(ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, info) } MQClientError::RemotingError(_) => { - Err(MQClientErr(ClientErr::new_with_code( - ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, - info, - ))) + mq_client_err!(ClientErrorCode::BROKER_NOT_EXIST_EXCEPTION, info) } _ => { unimplemented!("not support error type"); } } } else { - Err(MQClientErr(ClientErr::new(info))) + mq_client_err!(info) }; } } self.validate_name_server_setting()?; - Err(MQClientErr(ClientErr::new_with_code( + mq_client_err!( ClientErrorCode::NOT_FOUND_TOPIC_EXCEPTION, format!( "No route info of this topic:{},{}", topic, FAQUrl::suggest_todo(FAQUrl::NO_TOPIC_ROUTE_INFO) - ), - ))) + ) + ) } #[inline] @@ -987,10 +964,7 @@ impl DefaultMQProducerImpl { } if broker_addr.is_none() { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( - "The broker[{}] not exist", - broker_name, - )))); + return mq_client_err!(format!("The broker[{}] not exist", broker_name,)); } let mut broker_addr = broker_addr.unwrap(); broker_addr = mix_all::broker_vip_channel( @@ -1284,13 +1258,13 @@ impl DefaultMQProducerImpl { .get_mq_client_api_impl(); let ns_list = binding.get_name_server_address_list(); if ns_list.is_empty() { - return Err(MQClientError::MQClientErr(ClientErr::new_with_code( + return mq_client_err!( ClientErrorCode::NO_NAME_SERVER_EXCEPTION, format!( "No name remoting_server address, please set it. {}", FAQUrl::suggest_todo(FAQUrl::NAME_SERVER_ADDR_NOT_EXIST_URL) - ), - ))); + ) + ); } Ok(()) } @@ -1338,11 +1312,11 @@ impl DefaultMQProducerImpl { fn make_sure_state_ok(&self) -> Result<()> { if self.service_state != ServiceState::Running { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The producer service state not OK, {:?} {}", self.service_state, FAQUrl::suggest_todo(FAQUrl::CLIENT_SERVICE_NOT_OK) - )))); + )); } Ok(()) } @@ -1392,15 +1366,11 @@ impl DefaultMQProducerImpl { if let Some(message_queue) = message_queue { return Ok(message_queue); } - return Err(MQClientError::MQClientErr(ClientErr::new( - "select message queue return None.", - ))); + return mq_client_err!("select message queue return None."); } } self.validate_name_server_setting(); - Err(MQClientErr(ClientErr::new( - "select message queue return null.", - ))) + mq_client_err!("select message queue return null.") } pub async fn send_with_selector_timeout( @@ -1816,13 +1786,13 @@ impl DefaultMQProducerImpl { ), ))) } else { - Err(MQClientErr(ClientErr::new(format!( + mq_client_err!(format!( "send request message to <{}> fail, {}", topic, request_response_future .get_cause() .map_or("".to_string(), |cause| { cause.to_string() }) - )))) + )) } } @@ -1897,10 +1867,7 @@ impl DefaultMQProducerImpl { ); let result = self.send(&mut msg).await; if let Err(e) = result { - return Err(MQClientErr(ClientErr::new(format!( - "send message in transaction error, {}", - e - )))); + return mq_client_err!(format!("send message in transaction error, {}", e)); } let send_result = result.unwrap().expect("send result is none"); let local_transaction_state = match send_result.send_status { @@ -2254,12 +2221,12 @@ impl DefaultMQProducerImpl { .await; if !register_ok { self.service_state = ServiceState::CreateJust; - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The producer group[{}] has been created before, specify another name \ please. {}", self.producer_config.producer_group(), FAQUrl::suggest_todo(FAQUrl::GROUP_NAME_DUPLICATE_URL) - )))); + )); } if start_factory { let cloned = self.client_instance.as_mut().cloned().unwrap(); @@ -2272,21 +2239,17 @@ impl DefaultMQProducerImpl { self.service_state = ServiceState::Running; } ServiceState::Running => { - return Err(MQClientError::MQClientErr(ClientErr::new( - "The producer service state is Running", - ))); + return mq_client_err!("The producer service state is Running"); } ServiceState::ShutdownAlready => { - return Err(MQClientError::MQClientErr(ClientErr::new( - "The producer service state is ShutdownAlready", - ))); + return mq_client_err!("The producer service state is ShutdownAlready"); } ServiceState::StartFailed => { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The producer service state not OK, maybe started once,{:?},{}", self.service_state, FAQUrl::suggest_todo(FAQUrl::CLIENT_SERVICE_NOT_OK) - )))); + )); } } Ok(()) @@ -2304,11 +2267,11 @@ impl DefaultMQProducerImpl { fn check_config(&self) -> Result<()> { Validators::check_group(self.producer_config.producer_group())?; if self.producer_config.producer_group() == DEFAULT_PRODUCER_GROUP { - return Err(MQClientError::MQClientErr(ClientErr::new(format!( + return mq_client_err!(format!( "The specified group name[{}] is equal to default group, please specify another \ one.", DEFAULT_PRODUCER_GROUP - )))); + )); } Ok(()) } diff --git a/rocketmq-client/src/utils/message_util.rs b/rocketmq-client/src/utils/message_util.rs index defafa0f..e55a23fb 100644 --- a/rocketmq-client/src/utils/message_util.rs +++ b/rocketmq-client/src/utils/message_util.rs @@ -22,9 +22,8 @@ use rocketmq_common::common::message::MessageTrait; use rocketmq_common::common::mix_all; use rocketmq_common::MessageAccessor::MessageAccessor; -use crate::client_error::ClientErr; -use crate::client_error::MQClientError::MQClientErr; use crate::common::client_error_code::ClientErrorCode; +use crate::mq_client_err; use crate::Result; pub struct MessageUtil; @@ -73,13 +72,13 @@ impl MessageUtil { } Ok(reply_message) } else { - Err(MQClientErr(ClientErr::new_with_code( + mq_client_err!( ClientErrorCode::CREATE_REPLY_MESSAGE_EXCEPTION, format!( "create reply message fail, requestMessage error, property[{}] is null.", MessageConst::PROPERTY_CLUSTER - ), - ))) + ) + ) } } diff --git a/rocketmq/src/shutdown.rs b/rocketmq/src/shutdown.rs index 3ad2ca15..81e7a921 100644 --- a/rocketmq/src/shutdown.rs +++ b/rocketmq/src/shutdown.rs @@ -65,7 +65,6 @@ where #[cfg(test)] mod tests { - use super::*; #[tokio::test]