Skip to content

Commit 2b2680b

Browse files
committed
[ISSUE #983]🎨Optimize ConnectionHandler logic🔥
1 parent 6bdfbbc commit 2b2680b

File tree

2 files changed

+95
-62
lines changed

2 files changed

+95
-62
lines changed

rocketmq-remoting/src/protocol/remoting_command.rs

+2
Original file line numberDiff line numberDiff line change
@@ -284,11 +284,13 @@ impl RemotingCommand {
284284
self
285285
}
286286

287+
#[inline]
287288
pub fn set_opaque(mut self, opaque: i32) -> Self {
288289
self.opaque = opaque;
289290
self
290291
}
291292

293+
#[inline]
292294
pub fn set_opaque_mut(&mut self, opaque: i32) {
293295
self.opaque = opaque;
294296
}

rocketmq-remoting/src/remoting_server/server.rs

+93-62
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ impl<RP> ConnectionHandler<RP> {
106106
impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> {
107107
async fn handle(&mut self) -> Result<()> {
108108
while !self.shutdown.is_shutdown {
109+
//Get the next frame from the connection.
109110
let frame = tokio::select! {
110111
res = self.connection_handler_context.channel.connection.reader.next() => res,
111112
_ = self.shutdown.recv() =>{
@@ -135,38 +136,100 @@ impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> {
135136
}
136137
continue;
137138
}
138-
139-
//handle request
140-
let mut exception = match self.do_before_rpc_hooks(&self.channel, &mut cmd) {
139+
let opaque = cmd.opaque();
140+
let oneway_rpc = cmd.is_oneway_rpc();
141+
//before handle request hooks
142+
let exception = match self.do_before_rpc_hooks(&self.channel, &mut cmd) {
141143
Ok(_) => None,
142144
Err(error) => Some(error),
143145
};
144-
let opaque = cmd.opaque();
145-
let oneway_rpc = cmd.is_oneway_rpc();
146-
let mut response = if exception.is_some() {
147-
Some(RemotingCommand::create_remoting_command(
148-
ResponseCode::SystemError,
149-
))
150-
} else {
146+
//handle error if return have
147+
match self.handle_error(oneway_rpc, opaque, exception).await {
148+
HandleErrorResult::Continue => continue,
149+
HandleErrorResult::ReturnMethod => return Ok(()),
150+
HandleErrorResult::GoHead => {}
151+
}
152+
153+
let mut response = {
151154
let channel = self.channel.clone();
152155
let ctx = ArcRefCellWrapper::downgrade(&self.connection_handler_context);
153156
tokio::select! {
154-
result = self.request_processor.process_request(channel,ctx,cmd) => result?,
157+
result = self.request_processor.process_request(channel,ctx,cmd) => match result{
158+
Ok(value) => value,
159+
Err(_err) => Some(RemotingCommand::create_response_command_with_code(
160+
ResponseCode::SystemError,
161+
)),
162+
},
155163
}
156164
};
157-
if response.is_none() {
165+
166+
let exception =
167+
match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
168+
Ok(_) => None,
169+
Err(error) => Some(error),
170+
};
171+
172+
match self.handle_error(oneway_rpc, opaque, exception).await {
173+
HandleErrorResult::Continue => continue,
174+
HandleErrorResult::ReturnMethod => return Ok(()),
175+
HandleErrorResult::GoHead => {}
176+
}
177+
if response.is_none() || oneway_rpc {
158178
continue;
159179
}
160-
exception = match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) {
161-
Ok(_) => None,
162-
Err(error) => Some(error),
163-
};
180+
let response = response.unwrap();
181+
tokio::select! {
182+
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{
183+
Ok(_) =>{},
184+
Err(err) => {
185+
match err {
186+
Error::Io(io_error) => {
187+
error!("connection disconnect: {}", io_error);
188+
return Ok(())
189+
}
190+
_ => { error!("send response failed: {}", err);}
191+
}
192+
},
193+
},
194+
}
195+
}
196+
Ok(())
197+
}
164198

165-
if let Some(exception_inner) = exception {
166-
match exception_inner {
167-
Error::AbortProcessException(code, message) => {
199+
async fn handle_error(
200+
&mut self,
201+
oneway_rpc: bool,
202+
opaque: i32,
203+
exception: Option<Error>,
204+
) -> HandleErrorResult {
205+
if let Some(exception_inner) = exception {
206+
match exception_inner {
207+
Error::AbortProcessException(code, message) => {
208+
if oneway_rpc {
209+
return HandleErrorResult::Continue;
210+
}
211+
let response =
212+
RemotingCommand::create_response_command_with_code_remark(code, message);
213+
tokio::select! {
214+
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{
215+
Ok(_) =>{},
216+
Err(err) => {
217+
match err {
218+
Error::Io(io_error) => {
219+
error!("send response failed: {}", io_error);
220+
return HandleErrorResult::ReturnMethod;
221+
}
222+
_ => { error!("send response failed: {}", err);}
223+
}
224+
},
225+
},
226+
}
227+
}
228+
_ => {
229+
if !oneway_rpc {
168230
let response = RemotingCommand::create_response_command_with_code_remark(
169-
code, message,
231+
ResponseCode::SystemError,
232+
exception_inner.to_string(),
170233
);
171234
tokio::select! {
172235
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{
@@ -175,61 +238,29 @@ impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> {
175238
match err {
176239
Error::Io(io_error) => {
177240
error!("send response failed: {}", io_error);
178-
return Ok(())
241+
return HandleErrorResult::ReturnMethod;
179242
}
180243
_ => { error!("send response failed: {}", err);}
181244
}
182245
},
183246
},
184247
}
185248
}
186-
_ => {
187-
if !oneway_rpc {
188-
let response =
189-
RemotingCommand::create_response_command_with_code_remark(
190-
ResponseCode::SystemError,
191-
exception_inner.to_string(),
192-
);
193-
tokio::select! {
194-
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{
195-
Ok(_) =>{},
196-
Err(err) => {
197-
match err {
198-
Error::Io(io_error) => {
199-
error!("send response failed: {}", io_error);
200-
return Ok(())
201-
}
202-
_ => { error!("send response failed: {}", err);}
203-
}
204-
},
205-
},
206-
}
207-
}
208-
}
209249
}
210-
continue;
211-
}
212-
213-
let response = response.unwrap();
214-
tokio::select! {
215-
result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{
216-
Ok(_) =>{},
217-
Err(err) => {
218-
match err {
219-
Error::Io(io_error) => {
220-
error!("send response failed: {}", io_error);
221-
return Ok(())
222-
}
223-
_ => { error!("send response failed: {}", err);}
224-
}
225-
},
226-
},
227250
}
251+
HandleErrorResult::Continue
252+
} else {
253+
HandleErrorResult::GoHead
228254
}
229-
Ok(())
230255
}
231256
}
232257

258+
enum HandleErrorResult {
259+
Continue,
260+
ReturnMethod,
261+
GoHead,
262+
}
263+
233264
/// Server listener state. Created in the `run` call. It includes a `run` method
234265
/// which performs the TCP listening and initialization of per-connection state.
235266
struct ConnectionListener<RP> {

0 commit comments

Comments
 (0)