-
Notifications
You must be signed in to change notification settings - Fork 161
[ISSUE #983]🎨Optimize ConnectionHandler logic🔥 #984
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -106,6 +106,7 @@ impl<RP> ConnectionHandler<RP> { | |||||||||||||||||||||||||||||||
impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> { | ||||||||||||||||||||||||||||||||
async fn handle(&mut self) -> Result<()> { | ||||||||||||||||||||||||||||||||
while !self.shutdown.is_shutdown { | ||||||||||||||||||||||||||||||||
//Get the next frame from the connection. | ||||||||||||||||||||||||||||||||
let frame = tokio::select! { | ||||||||||||||||||||||||||||||||
res = self.connection_handler_context.channel.connection.reader.next() => res, | ||||||||||||||||||||||||||||||||
_ = self.shutdown.recv() =>{ | ||||||||||||||||||||||||||||||||
|
@@ -135,38 +136,100 @@ impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> { | |||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
continue; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
//handle request | ||||||||||||||||||||||||||||||||
let mut exception = match self.do_before_rpc_hooks(&self.channel, &mut cmd) { | ||||||||||||||||||||||||||||||||
let opaque = cmd.opaque(); | ||||||||||||||||||||||||||||||||
let oneway_rpc = cmd.is_oneway_rpc(); | ||||||||||||||||||||||||||||||||
//before handle request hooks | ||||||||||||||||||||||||||||||||
let exception = match self.do_before_rpc_hooks(&self.channel, &mut cmd) { | ||||||||||||||||||||||||||||||||
Ok(_) => None, | ||||||||||||||||||||||||||||||||
Err(error) => Some(error), | ||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||
let opaque = cmd.opaque(); | ||||||||||||||||||||||||||||||||
let oneway_rpc = cmd.is_oneway_rpc(); | ||||||||||||||||||||||||||||||||
let mut response = if exception.is_some() { | ||||||||||||||||||||||||||||||||
Some(RemotingCommand::create_remoting_command( | ||||||||||||||||||||||||||||||||
ResponseCode::SystemError, | ||||||||||||||||||||||||||||||||
)) | ||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||
//handle error if return have | ||||||||||||||||||||||||||||||||
match self.handle_error(oneway_rpc, opaque, exception).await { | ||||||||||||||||||||||||||||||||
HandleErrorResult::Continue => continue, | ||||||||||||||||||||||||||||||||
HandleErrorResult::ReturnMethod => return Ok(()), | ||||||||||||||||||||||||||||||||
HandleErrorResult::GoHead => {} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
let mut response = { | ||||||||||||||||||||||||||||||||
let channel = self.channel.clone(); | ||||||||||||||||||||||||||||||||
let ctx = ArcRefCellWrapper::downgrade(&self.connection_handler_context); | ||||||||||||||||||||||||||||||||
tokio::select! { | ||||||||||||||||||||||||||||||||
result = self.request_processor.process_request(channel,ctx,cmd) => result?, | ||||||||||||||||||||||||||||||||
result = self.request_processor.process_request(channel,ctx,cmd) => match result{ | ||||||||||||||||||||||||||||||||
Ok(value) => value, | ||||||||||||||||||||||||||||||||
Err(_err) => Some(RemotingCommand::create_response_command_with_code( | ||||||||||||||||||||||||||||||||
ResponseCode::SystemError, | ||||||||||||||||||||||||||||||||
)), | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
Comment on lines
+157
to
+162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Log errors from In the error case of Modify the code to log the error: Err(err) => {
+ error!("Failed to process request: {}", err);
Some(RemotingCommand::create_response_command_with_code(
ResponseCode::SystemError,
))
}, Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||
if response.is_none() { | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
let exception = | ||||||||||||||||||||||||||||||||
match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) { | ||||||||||||||||||||||||||||||||
Ok(_) => None, | ||||||||||||||||||||||||||||||||
Err(error) => Some(error), | ||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||
Comment on lines
+166
to
+170
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incorrect use of At line 167, after processing the request and generating the response, you are calling Apply this change: - let exception = match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
+ let exception = match self.do_after_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
Ok(_) => None,
Err(error) => Some(error),
}; Committable suggestion
Suggested change
Possible panic due to At line 167, you are calling Modify the code to safely handle the - let exception = match self.do_after_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
+ if let Some(ref mut resp) = response {
+ let exception = match self.do_after_rpc_hooks(&self.channel, resp) {
+ Ok(_) => None,
+ Err(error) => Some(error),
+ };
+ match self.handle_error(oneway_rpc, opaque, exception).await {
+ HandleErrorResult::Continue => continue,
+ HandleErrorResult::ReturnMethod => return Ok(()),
+ HandleErrorResult::GoHead => {}
+ }
+ } else if oneway_rpc {
+ continue;
+ } else {
+ // Handle the case where response is None and it's not a one-way RPC
+ // You might need to send an error response or log an error
+ // For example:
+ error!("Response is None for a non-one-way RPC");
+ return Ok(());
+ }
|
||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
match self.handle_error(oneway_rpc, opaque, exception).await { | ||||||||||||||||||||||||||||||||
HandleErrorResult::Continue => continue, | ||||||||||||||||||||||||||||||||
HandleErrorResult::ReturnMethod => return Ok(()), | ||||||||||||||||||||||||||||||||
HandleErrorResult::GoHead => {} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
if response.is_none() || oneway_rpc { | ||||||||||||||||||||||||||||||||
continue; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
exception = match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) { | ||||||||||||||||||||||||||||||||
Ok(_) => None, | ||||||||||||||||||||||||||||||||
Err(error) => Some(error), | ||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||
let response = response.unwrap(); | ||||||||||||||||||||||||||||||||
tokio::select! { | ||||||||||||||||||||||||||||||||
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{ | ||||||||||||||||||||||||||||||||
Ok(_) =>{}, | ||||||||||||||||||||||||||||||||
Err(err) => { | ||||||||||||||||||||||||||||||||
match err { | ||||||||||||||||||||||||||||||||
Error::Io(io_error) => { | ||||||||||||||||||||||||||||||||
error!("connection disconnect: {}", io_error); | ||||||||||||||||||||||||||||||||
return Ok(()) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
_ => { error!("send response failed: {}", err);} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if let Some(exception_inner) = exception { | ||||||||||||||||||||||||||||||||
match exception_inner { | ||||||||||||||||||||||||||||||||
Error::AbortProcessException(code, message) => { | ||||||||||||||||||||||||||||||||
async fn handle_error( | ||||||||||||||||||||||||||||||||
&mut self, | ||||||||||||||||||||||||||||||||
oneway_rpc: bool, | ||||||||||||||||||||||||||||||||
opaque: i32, | ||||||||||||||||||||||||||||||||
exception: Option<Error>, | ||||||||||||||||||||||||||||||||
) -> HandleErrorResult { | ||||||||||||||||||||||||||||||||
if let Some(exception_inner) = exception { | ||||||||||||||||||||||||||||||||
match exception_inner { | ||||||||||||||||||||||||||||||||
Error::AbortProcessException(code, message) => { | ||||||||||||||||||||||||||||||||
if oneway_rpc { | ||||||||||||||||||||||||||||||||
return HandleErrorResult::Continue; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
let response = | ||||||||||||||||||||||||||||||||
RemotingCommand::create_response_command_with_code_remark(code, message); | ||||||||||||||||||||||||||||||||
tokio::select! { | ||||||||||||||||||||||||||||||||
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{ | ||||||||||||||||||||||||||||||||
Ok(_) =>{}, | ||||||||||||||||||||||||||||||||
Err(err) => { | ||||||||||||||||||||||||||||||||
match err { | ||||||||||||||||||||||||||||||||
Error::Io(io_error) => { | ||||||||||||||||||||||||||||||||
error!("send response failed: {}", io_error); | ||||||||||||||||||||||||||||||||
return HandleErrorResult::ReturnMethod; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
_ => { error!("send response failed: {}", err);} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
_ => { | ||||||||||||||||||||||||||||||||
if !oneway_rpc { | ||||||||||||||||||||||||||||||||
let response = RemotingCommand::create_response_command_with_code_remark( | ||||||||||||||||||||||||||||||||
code, message, | ||||||||||||||||||||||||||||||||
ResponseCode::SystemError, | ||||||||||||||||||||||||||||||||
exception_inner.to_string(), | ||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||
tokio::select! { | ||||||||||||||||||||||||||||||||
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{ | ||||||||||||||||||||||||||||||||
|
@@ -175,61 +238,29 @@ impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> { | |||||||||||||||||||||||||||||||
match err { | ||||||||||||||||||||||||||||||||
Error::Io(io_error) => { | ||||||||||||||||||||||||||||||||
error!("send response failed: {}", io_error); | ||||||||||||||||||||||||||||||||
return Ok(()) | ||||||||||||||||||||||||||||||||
return HandleErrorResult::ReturnMethod; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
_ => { error!("send response failed: {}", err);} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
_ => { | ||||||||||||||||||||||||||||||||
if !oneway_rpc { | ||||||||||||||||||||||||||||||||
let response = | ||||||||||||||||||||||||||||||||
RemotingCommand::create_response_command_with_code_remark( | ||||||||||||||||||||||||||||||||
ResponseCode::SystemError, | ||||||||||||||||||||||||||||||||
exception_inner.to_string(), | ||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||
tokio::select! { | ||||||||||||||||||||||||||||||||
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{ | ||||||||||||||||||||||||||||||||
Ok(_) =>{}, | ||||||||||||||||||||||||||||||||
Err(err) => { | ||||||||||||||||||||||||||||||||
match err { | ||||||||||||||||||||||||||||||||
Error::Io(io_error) => { | ||||||||||||||||||||||||||||||||
error!("send response failed: {}", io_error); | ||||||||||||||||||||||||||||||||
return Ok(()) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
_ => { error!("send response failed: {}", err);} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
continue; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
let response = response.unwrap(); | ||||||||||||||||||||||||||||||||
tokio::select! { | ||||||||||||||||||||||||||||||||
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{ | ||||||||||||||||||||||||||||||||
Ok(_) =>{}, | ||||||||||||||||||||||||||||||||
Err(err) => { | ||||||||||||||||||||||||||||||||
match err { | ||||||||||||||||||||||||||||||||
Error::Io(io_error) => { | ||||||||||||||||||||||||||||||||
error!("send response failed: {}", io_error); | ||||||||||||||||||||||||||||||||
return Ok(()) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
_ => { error!("send response failed: {}", err);} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
HandleErrorResult::Continue | ||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||
HandleErrorResult::GoHead | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
enum HandleErrorResult { | ||||||||||||||||||||||||||||||||
Continue, | ||||||||||||||||||||||||||||||||
ReturnMethod, | ||||||||||||||||||||||||||||||||
GoHead, | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// Server listener state. Created in the `run` call. It includes a `run` method | ||||||||||||||||||||||||||||||||
/// which performs the TCP listening and initialization of per-connection state. | ||||||||||||||||||||||||||||||||
struct ConnectionListener<RP> { | ||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduce code duplication by refactoring error handling
The error handling blocks at lines 147-151 and 173-176 are nearly identical. Consider refactoring this repeated logic into a separate function to enhance maintainability and reduce duplication.
Example refactored code:
Then, replace the duplicated code with:
Also applies to: 172-176