-
Notifications
You must be signed in to change notification settings - Fork 143
[ISSUE #1527]⚡️Optimize SendMessageProcessor handle error logic #1528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes in this pull request focus on enhancing error handling within the Changes
Assessment against linked issues
Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1528 +/- ##
==========================================
- Coverage 24.86% 24.86% -0.01%
==========================================
Files 450 450
Lines 59494 59506 +12
==========================================
Hits 14795 14795
- Misses 44699 44711 +12 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (2)
rocketmq-broker/src/processor/send_message_processor.rs (2)
146-155
: Simplify await calls by removing unnecessaryOk()
wrappingThe code has been simplified by removing the unnecessary
Ok()
wrapping around theawait
calls for thesend_message
method. This improves code readability and clarity.
158-167
: Simplify await calls by removing unnecessaryOk()
wrappingSimilarly, the
await
call for thesend_batch_message
method no longer wraps the result inOk()
, which enhances readability.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (2)
rocketmq-broker/src/processor/send_message_processor.rs
(16 hunks)rocketmq-common/src/common/message/message_ext_broker_inner.rs
(1 hunks)
🔇 Additional comments (2)
rocketmq-broker/src/processor/send_message_processor.rs (2)
219-219
: Enhance error propagation by updating method return types
Updating the return types of send_batch_message
and send_message
to crate::Result<Option<RemotingCommand>>
allows for better error handling and propagation throughout the code.
Also applies to: 429-429
373-374
: Ensure proper error handling of asynchronous tasks
By adding .map_err(|e| RemotingCommandError(e.to_string()))?
after the await
on the spawned tasks, the code correctly handles errors that may occur during asynchronous execution, ensuring they are propagated appropriately.
Also applies to: 568-569
rocketmq_store::base::message_status_enum::PutMessageStatus::PutOk => { | ||
send_ok = true; | ||
response.set_code_ref(RemotingSysResponseCode::Success); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::FlushDiskTimeout => { | ||
send_ok = true; | ||
response.set_code_ref(ResponseCode::FlushDiskTimeout); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::FlushSlaveTimeout => { | ||
send_ok = true; | ||
response.set_code_ref(ResponseCode::FlushSlaveTimeout); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::SlaveNotAvailable =>{ | ||
send_ok = true; | ||
response.set_code_ref(ResponseCode::SlaveNotAvailable); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::ServiceNotAvailable => { | ||
response.set_code_mut(ResponseCode::ServiceNotAvailable).set_remark_mut("service not available now. It may be caused by one of the following reasons: \ | ||
the broker's disk is full %s, messages are put to the slave, message store has been shut down, etc."); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::CreateMappedFileFailed => { | ||
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("create mapped file failed, remoting_server is busy or broken."); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::MessageIllegal | | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::PropertiesSizeExceeded => { | ||
response.set_code_mut(ResponseCode::MessageIllegal).set_remark_mut("the message is illegal, maybe msg body or properties length not matched. msg body length limit B, msg properties length limit 32KB."); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::OsPageCacheBusy =>{ | ||
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::UnknownError => { | ||
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("UNKNOWN_ERROR"); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::InSyncReplicasNotEnough => { | ||
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("in-sync replicas not enough"); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::LmqConsumeQueueNumExceeded => { | ||
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w."); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerFlowControl => { | ||
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("timer message is under flow control, max num limit is %d or the current value is greater than %d and less than %d, trigger random flow control"); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerMsgIllegal => { | ||
response.set_code_mut(ResponseCode::MessageIllegal).set_remark_mut("timer message illegal, the delay time should not be bigger than the max delay %dms; or if set del msg, the delay time should be bigger than the current time"); | ||
}, | ||
rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerNotEnable => { | ||
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("accurate timer message is not enabled, timerWheelEnable is %s"); | ||
}, | ||
_ => { | ||
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("UNKNOWN_ERROR DEFAULT"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Inconsistent send_ok
flag setting in error handling
In the handle_put_message_result
method, the send_ok
flag is set to true
in some match arms but not in others. This could lead to inconsistent behavior in subsequent logic that relies on send_ok
. Please review and ensure that send_ok
is set appropriately in all relevant cases.
@@ -62,7 +62,7 @@ impl MessageExtBrokerInner { | |||
self.version | |||
} | |||
|
|||
pub fn topic(&self) -> &str { | |||
pub fn topic(&self) -> &CheetahString { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Update method return type impacts dependent code
Changing the return type of the topic()
method to &CheetahString
may affect other parts of the codebase that rely on this method. Ensure all usages of topic()
are updated to handle the new return type, preventing potential type mismatch errors.
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Which Issue(s) This PR Fixes(Closes)
Fixes #1527
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
CheetahString
, enhancing type handling.