Skip to content

Commit cc82fac

Browse files
authored
[ISSUE #6]Optimize the deserialization of RemotingCommand (#17)
* [ISSUE #6]Optimize the deserialization of RemotingCommand * fix ci error * fix ci error
1 parent 7ca5a85 commit cc82fac

File tree

2 files changed

+8
-13
lines changed

2 files changed

+8
-13
lines changed

namesrv/src/namesrv_bootstrap.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,10 @@ impl Shared {
117117

118118
async fn process(&mut self, client_addr: SocketAddr, cmd: RemotingCommand) {
119119
if let Some(tx) = self.peers.get_mut(&client_addr) {
120+
let opaque = cmd.opaque();
120121
let result = self.broker_request_processor.process_request(cmd);
121-
let _ = tx.send(result);
122+
//Broker handling compatible with the Java platform.
123+
let _ = tx.send(result.set_opaque(opaque));
122124
}
123125
}
124126
}

namesrv/src/processor/broker_request_processor.rs

+5-12
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
1718
use bytes::Bytes;
1819
use rocketmq_common::{
1920
common::{mix_all, mq_version::RocketMqVersion},
@@ -31,7 +32,7 @@ use rocketmq_remoting::{
3132
RemotingSerializable,
3233
},
3334
};
34-
use tracing::{info, warn};
35+
use tracing::warn;
3536

3637
use crate::route::route_info_manager::RouteInfoManager;
3738

@@ -67,17 +68,14 @@ impl BrokerRequestProcessor {
6768
let request_header = request
6869
.decode_command_custom_header::<RegisterBrokerRequestHeader>()
6970
.unwrap();
70-
let opaque = request.opaque();
71-
7271
if !check_sum_crc32(&request, &request_header) {
7372
return RemotingCommand::create_response_command_with_code(
7473
RemotingSysResponseCode::SystemError as i32,
7574
)
76-
.set_remark(Some(String::from("crc32 not match")))
77-
.set_opaque(opaque);
75+
.set_remark(Some(String::from("crc32 not match")));
7876
}
7977

80-
let response_command = RemotingCommand::create_response_command().set_opaque(opaque);
78+
let response_command = RemotingCommand::create_response_command();
8179
let broker_version = RocketMqVersion::try_from(request.version()).unwrap();
8280
let topic_config_wrapper;
8381
let mut filter_server_list = Vec::<String>::new();
@@ -127,11 +125,10 @@ impl BrokerRequestProcessor {
127125
}
128126

129127
impl BrokerRequestProcessor {
130-
fn process_get_broker_cluster_info(&mut self, request: RemotingCommand) -> RemotingCommand {
128+
fn process_get_broker_cluster_info(&mut self, _request: RemotingCommand) -> RemotingCommand {
131129
let vec = self.route_info_manager.get_all_cluster_info().encode(false);
132130
RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::Success as i32)
133131
.set_body(Some(Bytes::from(vec)))
134-
.set_opaque(request.opaque())
135132
}
136133
}
137134

@@ -168,10 +165,6 @@ fn check_sum_crc32(
168165
}
169166
if let Some(bytes) = &request.get_body() {
170167
let crc_32 = crc32(bytes.iter().as_ref());
171-
info!(
172-
"Rec request body crc32:{}-{}",
173-
request_header.body_crc32, crc_32
174-
);
175168
if crc_32 != request_header.body_crc32 {
176169
warn!(
177170
"receive registerBroker request,crc32 not match,origin:{}, cal:{}",

0 commit comments

Comments
 (0)