-
Notifications
You must be signed in to change notification settings - Fork 143
[ISSUE #2338]🤡Add RequestTask for rust🧑💻 #2406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughA new module Changes
Assessment against linked issues
Suggested Labels
Suggested Reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2406 +/- ##
==========================================
- Coverage 28.61% 28.60% -0.02%
==========================================
Files 507 508 +1
Lines 73337 73374 +37
==========================================
Hits 20989 20989
- Misses 52348 52385 +37 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (1)
rocketmq-remoting/src/base/request_task.rs (1)
36-49
: Enhance constructor with validation and error handling.The constructor should validate inputs and handle potential error cases:
- Validate channel state
- Consider adding optional parameters for initial stop state
- Add builder pattern for better construction flexibility
impl RequestTask { + /// Creates a new RequestTask with the given parameters. + /// + /// # Arguments + /// * `runnable` - The function to execute + /// * `channel` - Communication channel + /// * `request` - Original request command + /// + /// # Returns + /// Result<RequestTask, Error> indicating success or failure pub fn new( runnable: Arc<dyn Fn() + Send + Sync>, channel: Channel, request: RemotingCommand, - ) -> Self { + ) -> Result<Self, Box<dyn std::error::Error>> { + if !channel.is_active() { + return Err("Channel is not active".into()); + } + Self { runnable, create_timestamp: get_current_millis(), channel, request, stop_run: Arc::new(parking_lot::Mutex::new(false)), - } + }.into() }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-remoting/src/base.rs
(1 hunks)rocketmq-remoting/src/base/request_task.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- rocketmq-remoting/src/base.rs
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: test
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build
- GitHub Check: auto-approve
- GitHub Check: build (ubuntu-latest, stable)
pub async fn return_response(&self, _code: i32, _remark: String) { | ||
unimplemented!("return_response") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement return_response method.
The return_response
method is currently unimplemented. This could lead to runtime panics.
- pub async fn return_response(&self, _code: i32, _remark: String) {
- unimplemented!("return_response")
+ pub async fn return_response(&self, code: i32, remark: String) -> Result<(), Box<dyn std::error::Error>> {
+ let response = RemotingCommand::create_response_command(code)?;
+ response.set_remark(remark);
+ self.channel.write_and_flush(response).await
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub async fn return_response(&self, _code: i32, _remark: String) { | |
unimplemented!("return_response") | |
} | |
pub async fn return_response(&self, code: i32, remark: String) -> Result<(), Box<dyn std::error::Error>> { | |
let response = RemotingCommand::create_response_command(code)?; | |
response.set_remark(remark); | |
self.channel.write_and_flush(response).await | |
} |
pub struct RequestTask { | ||
runnable: Arc<dyn Fn() + Send + Sync>, | ||
create_timestamp: u64, | ||
channel: Channel, | ||
request: RemotingCommand, | ||
stop_run: Arc<parking_lot::Mutex<bool>>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add documentation and consider error handling improvements.
The RequestTask
struct could benefit from the following improvements:
- Add documentation comments (
///
) explaining the purpose of the struct and each field - Consider using
Result
type for the runnable to handle potential errors - Consider using
Arc<AsyncFn>
instead ofArc<Fn>
for better async support
+/// A task that can be executed asynchronously with built-in execution control.
pub struct RequestTask {
+ /// The async function to be executed
- runnable: Arc<dyn Fn() + Send + Sync>,
+ runnable: Arc<dyn Fn() -> Result<(), Box<dyn std::error::Error>> + Send + Sync>,
+ /// Timestamp when the task was created
create_timestamp: u64,
+ /// Communication channel
channel: Channel,
+ /// Original request command
request: RemotingCommand,
+ /// Flag to control task execution
stop_run: Arc<parking_lot::Mutex<bool>>,
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub struct RequestTask { | |
runnable: Arc<dyn Fn() + Send + Sync>, | |
create_timestamp: u64, | |
channel: Channel, | |
request: RemotingCommand, | |
stop_run: Arc<parking_lot::Mutex<bool>>, | |
} | |
/// A task that can be executed asynchronously with built-in execution control. | |
pub struct RequestTask { | |
/// The async function to be executed | |
runnable: Arc<dyn Fn() -> Result<(), Box<dyn std::error::Error>> + Send + Sync>, | |
/// Timestamp when the task was created | |
create_timestamp: u64, | |
/// Communication channel | |
channel: Channel, | |
/// Original request command | |
request: RemotingCommand, | |
/// Flag to control task execution | |
stop_run: Arc<parking_lot::Mutex<bool>>, | |
} |
impl Future for RequestTask { | ||
type Output = (); | ||
|
||
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
if !self.is_stop_run() { | ||
(self.runnable)(); | ||
return Poll::Ready(()); | ||
} | ||
Poll::Pending | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve Future implementation with proper error handling and waking mechanism.
The Future implementation needs several improvements:
- Handle errors from runnable
- Add waking mechanism when stop_run changes
- Prevent infinite pending state
- Consider using proper async execution
-impl Future for RequestTask {
+impl Future for RequestTask {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.is_stop_run() {
- (self.runnable)();
- return Poll::Ready(());
+ match (self.runnable)() {
+ Ok(_) => Poll::Ready(()),
+ Err(e) => {
+ log::error!("Task execution failed: {}", e);
+ Poll::Ready(())
+ }
+ }
+ } else {
+ // Register waker to be notified when stop_run changes
+ let waker = cx.waker().clone();
+ let mut stop_run = self.stop_run.lock();
+ if *stop_run {
+ // Store waker to be called when stop_run is set to false
+ // This requires adding a waker field to the struct
+ Poll::Pending
+ } else {
+ Poll::Ready(())
+ }
}
- Poll::Pending
}
}
Committable suggestion skipped: line range outside the PR's diff.
pub fn set_stop_run(&self, stop_run: bool) { | ||
let mut stop_run_lock = self.stop_run.lock(); | ||
*stop_run_lock = stop_run; | ||
} | ||
|
||
pub fn get_create_timestamp(&self) -> u64 { | ||
self.create_timestamp | ||
} | ||
|
||
pub fn is_stop_run(&self) -> bool { | ||
*self.stop_run.lock() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add deadlock prevention and documentation to utility methods.
The state management methods need documentation and deadlock prevention:
- Consider using a timeout for lock acquisition
- Add documentation for public methods
- Consider using atomic types for simple boolean flags
impl RequestTask {
+ /// Sets the stop flag to control task execution
+ ///
+ /// # Arguments
+ /// * `stop_run` - New value for the stop flag
pub fn set_stop_run(&self, stop_run: bool) {
- let mut stop_run_lock = self.stop_run.lock();
+ if let Some(mut stop_run_lock) = self.stop_run.try_lock_for(std::time::Duration::from_secs(1)) {
+ *stop_run_lock = stop_run;
+ } else {
+ log::warn!("Failed to acquire lock for stop_run flag");
+ }
- *stop_run_lock = stop_run;
}
+ /// Returns the creation timestamp of the task
pub fn get_create_timestamp(&self) -> u64 {
self.create_timestamp
}
+ /// Returns the current state of the stop flag
pub fn is_stop_run(&self) -> bool {
- *self.stop_run.lock()
+ self.stop_run.try_lock().map(|guard| *guard).unwrap_or(true)
}
Committable suggestion skipped: line range outside the PR's diff.
impl RequestTask { | ||
pub async fn run(&self) { | ||
if !self.is_stop_run() { | ||
(self.runnable)(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling and timeout to run method.
The run method needs better error handling and timeout mechanism:
- Handle potential errors from runnable
- Add timeout for long-running tasks
- Add logging for better observability
impl RequestTask {
- pub async fn run(&self) {
+ pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
if !self.is_stop_run() {
- (self.runnable)();
+ tokio::select! {
+ result = tokio::task::spawn_blocking(|| (self.runnable)()) => {
+ match result {
+ Ok(Ok(_)) => Ok(()),
+ Ok(Err(e)) => {
+ log::error!("Task execution failed: {}", e);
+ Err(e)
+ },
+ Err(e) => {
+ log::error!("Task panicked: {}", e);
+ Err("Task panicked".into())
+ }
+ }
+ }
+ _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
+ log::warn!("Task timed out");
+ Err("Task timed out".into())
+ }
+ }
+ } else {
+ Ok(())
}
}
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #2338
closes #2335
closes #1917
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
RequestTask
module to support asynchronous task management