Skip to content

[ISSUE #983]🎨Optimize ConnectionHandler logic🔥 #984

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rocketmq-remoting/src/protocol/remoting_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,13 @@ impl RemotingCommand {
self
}

#[inline]
pub fn set_opaque(mut self, opaque: i32) -> Self {
self.opaque = opaque;
self
}

#[inline]
pub fn set_opaque_mut(&mut self, opaque: i32) {
self.opaque = opaque;
}
Expand Down
155 changes: 93 additions & 62 deletions rocketmq-remoting/src/remoting_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl<RP> ConnectionHandler<RP> {
impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> {
async fn handle(&mut self) -> Result<()> {
while !self.shutdown.is_shutdown {
//Get the next frame from the connection.
let frame = tokio::select! {
res = self.connection_handler_context.channel.connection.reader.next() => res,
_ = self.shutdown.recv() =>{
Expand Down Expand Up @@ -135,38 +136,100 @@ impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> {
}
continue;
}

//handle request
let mut exception = match self.do_before_rpc_hooks(&self.channel, &mut cmd) {
let opaque = cmd.opaque();
let oneway_rpc = cmd.is_oneway_rpc();
//before handle request hooks
let exception = match self.do_before_rpc_hooks(&self.channel, &mut cmd) {
Ok(_) => None,
Err(error) => Some(error),
};
let opaque = cmd.opaque();
let oneway_rpc = cmd.is_oneway_rpc();
let mut response = if exception.is_some() {
Some(RemotingCommand::create_remoting_command(
ResponseCode::SystemError,
))
} else {
//handle error if return have
match self.handle_error(oneway_rpc, opaque, exception).await {
HandleErrorResult::Continue => continue,
HandleErrorResult::ReturnMethod => return Ok(()),
HandleErrorResult::GoHead => {}
}
Comment on lines +146 to +151
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduce code duplication by refactoring error handling

The error handling blocks at lines 147-151 and 173-176 are nearly identical. Consider refactoring this repeated logic into a separate function to enhance maintainability and reduce duplication.

Example refactored code:

async fn process_handle_error(
    &mut self,
    oneway_rpc: bool,
    opaque: i32,
    exception: Option<Error>,
) -> Result<bool> {
    match self.handle_error(oneway_rpc, opaque, exception).await {
        HandleErrorResult::Continue => Ok(true),
        HandleErrorResult::ReturnMethod => Ok(false),
        HandleErrorResult::GoHead => Ok(false),
    }
}

Then, replace the duplicated code with:

if self.process_handle_error(oneway_rpc, opaque, exception).await? {
    continue;
}

Also applies to: 172-176


let mut response = {
let channel = self.channel.clone();
let ctx = ArcRefCellWrapper::downgrade(&self.connection_handler_context);
tokio::select! {
result = self.request_processor.process_request(channel,ctx,cmd) => result?,
result = self.request_processor.process_request(channel,ctx,cmd) => match result{
Ok(value) => value,
Err(_err) => Some(RemotingCommand::create_response_command_with_code(
ResponseCode::SystemError,
)),
},
Comment on lines +157 to +162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log errors from process_request for better debugging

In the error case of process_request, the error is currently ignored (Err(_err)). Logging the error can aid in debugging and provide more context if issues arise.

Modify the code to log the error:

Err(err) => {
+   error!("Failed to process request: {}", err);
    Some(RemotingCommand::create_response_command_with_code(
        ResponseCode::SystemError,
    ))
},
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
result = self.request_processor.process_request(channel,ctx,cmd) => match result{
Ok(value) => value,
Err(_err) => Some(RemotingCommand::create_response_command_with_code(
ResponseCode::SystemError,
)),
},
result = self.request_processor.process_request(channel,ctx,cmd) => match result{
Ok(value) => value,
Err(err) => {
error!("Failed to process request: {}", err);
Some(RemotingCommand::create_response_command_with_code(
ResponseCode::SystemError,
))
},
},

}
};
if response.is_none() {

let exception =
match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
Ok(_) => None,
Err(error) => Some(error),
};
Comment on lines +166 to +170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect use of do_before_rpc_hooks; should be do_after_rpc_hooks

At line 167, after processing the request and generating the response, you are calling do_before_rpc_hooks on the response. This seems incorrect, as do_before_rpc_hooks is intended for processing before handling the request. You should use do_after_rpc_hooks to process hooks after handling the response.

Apply this change:

- let exception = match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
+ let exception = match self.do_after_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
    Ok(_) => None,
    Err(error) => Some(error),
};
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let exception =
match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
Ok(_) => None,
Err(error) => Some(error),
};
let exception =
match self.do_after_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
Ok(_) => None,
Err(error) => Some(error),
};

Possible panic due to unwrap() on response when it is None

At line 167, you are calling response.as_mut().unwrap(). If response is None, this will cause a panic at runtime. Since you check for response.is_none() later at line 177, it's possible for response to be None.

Modify the code to safely handle the None case:

- let exception = match self.do_after_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
+ if let Some(ref mut resp) = response {
+     let exception = match self.do_after_rpc_hooks(&self.channel, resp) {
+         Ok(_) => None,
+         Err(error) => Some(error),
+     };
+     match self.handle_error(oneway_rpc, opaque, exception).await {
+         HandleErrorResult::Continue => continue,
+         HandleErrorResult::ReturnMethod => return Ok(()),
+         HandleErrorResult::GoHead => {}
+     }
+ } else if oneway_rpc {
+     continue;
+ } else {
+     // Handle the case where response is None and it's not a one-way RPC
+     // You might need to send an error response or log an error
+     // For example:
+     error!("Response is None for a non-one-way RPC");
+     return Ok(());
+ }

Committable suggestion was skipped due to low confidence.


match self.handle_error(oneway_rpc, opaque, exception).await {
HandleErrorResult::Continue => continue,
HandleErrorResult::ReturnMethod => return Ok(()),
HandleErrorResult::GoHead => {}
}
if response.is_none() || oneway_rpc {
continue;
}
exception = match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
Ok(_) => None,
Err(error) => Some(error),
};
let response = response.unwrap();
tokio::select! {
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{
Ok(_) =>{},
Err(err) => {
match err {
Error::Io(io_error) => {
error!("connection disconnect: {}", io_error);
return Ok(())
}
_ => { error!("send response failed: {}", err);}
}
},
},
}
}
Ok(())
}

if let Some(exception_inner) = exception {
match exception_inner {
Error::AbortProcessException(code, message) => {
async fn handle_error(
&mut self,
oneway_rpc: bool,
opaque: i32,
exception: Option<Error>,
) -> HandleErrorResult {
if let Some(exception_inner) = exception {
match exception_inner {
Error::AbortProcessException(code, message) => {
if oneway_rpc {
return HandleErrorResult::Continue;
}
let response =
RemotingCommand::create_response_command_with_code_remark(code, message);
tokio::select! {
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{
Ok(_) =>{},
Err(err) => {
match err {
Error::Io(io_error) => {
error!("send response failed: {}", io_error);
return HandleErrorResult::ReturnMethod;
}
_ => { error!("send response failed: {}", err);}
}
},
},
}
}
_ => {
if !oneway_rpc {
let response = RemotingCommand::create_response_command_with_code_remark(
code, message,
ResponseCode::SystemError,
exception_inner.to_string(),
);
tokio::select! {
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{
Expand All @@ -175,61 +238,29 @@ impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> {
match err {
Error::Io(io_error) => {
error!("send response failed: {}", io_error);
return Ok(())
return HandleErrorResult::ReturnMethod;
}
_ => { error!("send response failed: {}", err);}
}
},
},
}
}
_ => {
if !oneway_rpc {
let response =
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemError,
exception_inner.to_string(),
);
tokio::select! {
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{
Ok(_) =>{},
Err(err) => {
match err {
Error::Io(io_error) => {
error!("send response failed: {}", io_error);
return Ok(())
}
_ => { error!("send response failed: {}", err);}
}
},
},
}
}
}
}
continue;
}

let response = response.unwrap();
tokio::select! {
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{
Ok(_) =>{},
Err(err) => {
match err {
Error::Io(io_error) => {
error!("send response failed: {}", io_error);
return Ok(())
}
_ => { error!("send response failed: {}", err);}
}
},
},
}
HandleErrorResult::Continue
} else {
HandleErrorResult::GoHead
}
Ok(())
}
}

enum HandleErrorResult {
Continue,
ReturnMethod,
GoHead,
}

/// Server listener state. Created in the `run` call. It includes a `run` method
/// which performs the TCP listening and initialization of per-connection state.
struct ConnectionListener<RP> {
Expand Down
Loading