Skip to content

Commit 29a2345

Browse files
authored
[ISSUE mxsm#703]🚀Implement RpcClientUtils#encode_body method🚀 (mxsm#707)
1 parent 01cd6f7 commit 29a2345

25 files changed

+68
-164
lines changed

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,3 @@ struct ConsumerOffsetWrapper {
223223
#[serde(skip)]
224224
version_change_counter: Arc<AtomicI64>,
225225
}
226-
227-
impl RemotingSerializable for ConsumerOffsetWrapper {
228-
type Output = Self;
229-
}

rocketmq-broker/src/out_api/broker_outer_api.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use dns_lookup::lookup_host;
2020
use rocketmq_common::common::broker::broker_config::BrokerIdentity;
2121
use rocketmq_common::common::config::TopicConfig;
2222
use rocketmq_common::utils::crc32_utils;
23+
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
2324
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
2425
use rocketmq_remoting::clients::RemotingClient;
2526
use rocketmq_remoting::code::request_code::RequestCode;
@@ -209,7 +210,7 @@ impl BrokerOuterAPI {
209210
result.master_addr = header.master_addr.clone().unwrap_or("".to_string());
210211
}
211212
if let Some(body) = response.body() {
212-
result.kv_table = KVTable::decode(body.as_ref());
213+
result.kv_table = SerdeJsonUtils::decode::<KVTable>(body.as_ref());
213214
}
214215
Some(result)
215216
}

rocketmq-broker/src/processor/client_manage_processor.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ use rocketmq_common::common::mix_all;
2424
use rocketmq_common::common::mix_all::IS_SUB_CHANGE;
2525
use rocketmq_common::common::mix_all::IS_SUPPORT_HEART_BEAT_V2;
2626
use rocketmq_common::common::sys_flag::topic_sys_flag;
27+
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
2728
use rocketmq_remoting::code::request_code::RequestCode;
2829
use rocketmq_remoting::protocol::header::unregister_client_request_header::UnregisterClientRequestHeader;
2930
use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
3031
use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData;
3132
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
32-
use rocketmq_remoting::protocol::RemotingSerializable;
3333
use rocketmq_remoting::runtime::server::ConnectionHandlerContext;
3434
use rocketmq_store::log_file::MessageStore;
3535
use tracing::info;
@@ -129,8 +129,9 @@ where
129129
ctx: ConnectionHandlerContext<'_>,
130130
request: RemotingCommand,
131131
) -> Option<RemotingCommand> {
132-
let heartbeat_data =
133-
HeartbeatData::decode(request.body().as_ref().map(|v| v.as_ref()).unwrap());
132+
let heartbeat_data = SerdeJsonUtils::decode::<HeartbeatData>(
133+
request.body().as_ref().map(|v| v.as_ref()).unwrap(),
134+
);
134135
let client_channel_info = ClientChannelInfo::new(
135136
ctx.as_ref().connection().channel().clone(),
136137
heartbeat_data.client_id.clone(),

rocketmq-broker/src/subscription/manager/subscription_group_manager.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,3 @@ impl SubscriptionGroupWrapper {
204204
&self.forbidden_table
205205
}
206206
}
207-
208-
impl RemotingSerializable for SubscriptionGroupWrapper {
209-
type Output = Self;
210-
}

rocketmq-common/src/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ pub mod file_utils;
2121
pub mod message_utils;
2222
pub mod parse_config_file;
2323
pub mod queue_type_utils;
24+
pub mod serde_json_utils;
2425
pub mod time_utils;
2526
pub mod util_all;

rocketmq-common/src/utils/serde_json_utils.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ use crate::error::SerdeJsonError;
1919
pub struct SerdeJsonUtils;
2020

2121
impl SerdeJsonUtils {
22+
pub fn decode<T>(bytes: &[u8]) -> T
23+
where
24+
T: serde::de::DeserializeOwned,
25+
{
26+
serde_json::from_slice::<T>(bytes).unwrap()
27+
}
28+
2229
pub fn from_json<T>(json: &str) -> Result<T, SerdeJsonError>
2330
where
2431
T: serde::de::DeserializeOwned,

rocketmq-namesrv/src/kvconfig.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
use std::collections::HashMap;
1919

20-
use rocketmq_remoting::protocol::RemotingSerializable;
2120
use serde::Deserialize;
2221
use serde::Serialize;
2322

@@ -45,7 +44,3 @@ impl KVConfigSerializeWrapper {
4544
}
4645
}
4746
}
48-
49-
impl RemotingSerializable for KVConfigSerializeWrapper {
50-
type Output = Self;
51-
}

rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::collections::HashMap;
1818
use std::sync::Arc;
1919

2020
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
21+
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
2122
use rocketmq_common::FileUtils;
2223
use rocketmq_remoting::protocol::body::kv_table::KVTable;
2324
use rocketmq_remoting::protocol::RemotingSerializable;
@@ -75,7 +76,7 @@ impl KVConfigManager {
7576
pub fn load(&mut self) {
7677
let result = FileUtils::file_to_string(self.namesrv_config.kv_config_path.as_str());
7778
if let Ok(content) = result {
78-
let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes());
79+
let wrapper = SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes());
7980
if let Some(ref config_table) = wrapper.config_table {
8081
for (namespace, config) in config_table {
8182
self.config_table.insert(namespace.clone(), config.clone());

rocketmq-namesrv/src/processor/default_request_processor.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use bytes::Bytes;
2222
use rocketmq_common::common::mix_all;
2323
use rocketmq_common::common::mq_version::RocketMqVersion;
2424
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
25+
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
2526
use rocketmq_common::CRC32Utils;
2627
use rocketmq_remoting::code::request_code::RequestCode;
2728
use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
@@ -176,8 +177,9 @@ impl DefaultRequestProcessor {
176177
let request_header = request
177178
.decode_command_custom_header::<QueryDataVersionRequestHeader>()
178179
.unwrap();
179-
let data_version =
180-
DataVersion::decode(request.body().as_ref().map(|v| v.as_ref()).unwrap());
180+
let data_version = SerdeJsonUtils::decode::<DataVersion>(
181+
request.body().as_ref().map(|v| v.as_ref()).unwrap(),
182+
);
181183
let changed = self
182184
.route_info_manager
183185
.read()
@@ -407,7 +409,7 @@ impl DefaultRequestProcessor {
407409
.decode_command_custom_header::<RegisterTopicRequestHeader>()
408410
.unwrap();
409411
if let Some(ref body) = request.body() {
410-
let topic_route_data = TopicRouteData::decode(body);
412+
let topic_route_data = SerdeJsonUtils::decode::<TopicRouteData>(body);
411413
if !topic_route_data.queue_datas.is_empty() {
412414
self.route_info_manager
413415
.write()
@@ -514,7 +516,9 @@ fn extract_register_topic_config_from_request(
514516
request: &RemotingCommand,
515517
) -> TopicConfigAndMappingSerializeWrapper {
516518
if let Some(body_inner) = request.body() {
517-
return TopicConfigAndMappingSerializeWrapper::decode(body_inner.iter().as_slice());
519+
return SerdeJsonUtils::decode::<TopicConfigAndMappingSerializeWrapper>(
520+
body_inner.iter().as_slice(),
521+
);
518522
}
519523
TopicConfigAndMappingSerializeWrapper::default()
520524
}

rocketmq-remoting/src/protocol.rs

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use std::time::SystemTime;
2424

2525
use rocketmq_common::common::mix_all;
2626
use rocketmq_common::common::topic::TopicValidator;
27+
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
2728
use rocketmq_common::utils::time_utils;
28-
use serde::de;
2929
use serde::ser::SerializeStruct;
3030
use serde::Deserialize;
3131
use serde::Serialize;
@@ -222,10 +222,6 @@ impl<'de> Deserialize<'de> for DataVersion {
222222
}
223223
}
224224

225-
impl RemotingSerializable for DataVersion {
226-
type Output = DataVersion;
227-
}
228-
229225
impl Clone for DataVersion {
230226
fn clone(&self) -> Self {
231227
DataVersion {
@@ -326,44 +322,24 @@ impl Display for DataVersion {
326322

327323
/// A trait for types that can be deserialized from a byte vector.
328324
pub trait RemotingSerializable {
329-
/// The output type after deserialization.
330-
type Output;
331-
332-
/// Decode a byte vector into the corresponding type.
333-
///
334-
/// # Arguments
335-
///
336-
/// * `bytes` - The byte vector to be deserialized.
337-
///
338-
/// # Returns
339-
///
340-
/// The deserialized output of type `Self::Output`.
341-
fn decode<'a>(bytes: &'a [u8]) -> Self::Output
342-
where
343-
Self::Output: de::Deserialize<'a>,
344-
{
345-
serde_json::from_slice::<Self::Output>(bytes).unwrap()
346-
}
325+
fn encode(&self) -> Vec<u8>;
326+
fn to_json(&self) -> String;
327+
fn to_json_pretty(&self) -> String;
328+
}
347329

348-
fn encode(&self) -> Vec<u8>
349-
where
350-
Self: Serialize,
351-
{
352-
serde_json::to_vec(self).unwrap()
330+
pub trait JsonSerializable: Serialize + RemotingSerializable {}
331+
332+
impl<T: Serialize> RemotingSerializable for T {
333+
fn encode(&self) -> Vec<u8> {
334+
SerdeJsonUtils::to_json_vec(self).unwrap()
353335
}
354336

355-
fn to_json(&self) -> String
356-
where
357-
Self: Serialize,
358-
{
359-
serde_json::to_string(self).unwrap()
337+
fn to_json(&self) -> String {
338+
SerdeJsonUtils::to_json(self).unwrap()
360339
}
361340

362-
fn to_json_pretty(&self) -> String
363-
where
364-
Self: Serialize,
365-
{
366-
serde_json::to_string_pretty(self).unwrap()
341+
fn to_json_pretty(&self) -> String {
342+
SerdeJsonUtils::to_json_pretty(self).unwrap()
367343
}
368344
}
369345

rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ use std::collections::HashMap;
1919
use serde::Deserialize;
2020
use serde::Serialize;
2121

22-
use crate::protocol::RemotingSerializable;
23-
2422
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2523
#[serde(rename_all = "camelCase")]
2624
pub struct BrokerMemberGroup {
@@ -43,16 +41,8 @@ impl BrokerMemberGroup {
4341
}
4442
}
4543

46-
impl RemotingSerializable for BrokerMemberGroup {
47-
type Output = BrokerMemberGroup;
48-
}
49-
5044
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
5145
#[serde(rename_all = "camelCase")]
5246
pub struct GetBrokerMemberGroupResponseBody {
5347
pub broker_member_group: Option<BrokerMemberGroup>,
5448
}
55-
56-
impl RemotingSerializable for GetBrokerMemberGroupResponseBody {
57-
type Output = GetBrokerMemberGroupResponseBody;
58-
}

rocketmq-remoting/src/protocol/body/broker_body/cluster_info.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use serde::Deserialize;
2222
use serde::Serialize;
2323

2424
use crate::protocol::route::route_data_view::BrokerData;
25-
use crate::protocol::RemotingSerializable;
2625

2726
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2827
pub struct ClusterInfo {
@@ -33,18 +32,6 @@ pub struct ClusterInfo {
3332
cluster_addr_table: Option<HashMap<String, HashSet<String>>>,
3433
}
3534

36-
impl RemotingSerializable for ClusterInfo {
37-
type Output = ClusterInfo;
38-
39-
/*fn decode(bytes: &[u8]) -> ClusterInfo {
40-
serde_json::from_slice::<Self::Output>(bytes).unwrap()
41-
}
42-
43-
fn encode(&self, _compress: bool) -> Vec<u8> {
44-
serde_json::to_vec(self).unwrap()
45-
}*/
46-
}
47-
4835
impl ClusterInfo {
4936
pub fn new(
5037
broker_addr_table: Option<HashMap<String, BrokerData>>,

rocketmq-remoting/src/protocol/body/broker_body/register_broker_body.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use bytes::Bytes;
1919
use rocketmq_common::common::mq_version::RocketMqVersion;
20+
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
2021
use serde::Deserialize;
2122
use serde::Serialize;
2223

@@ -66,20 +67,8 @@ impl RegisterBrokerBody {
6667
_broker_version: RocketMqVersion,
6768
) -> RegisterBrokerBody {
6869
if !compressed {
69-
return <RegisterBrokerBody as RemotingSerializable>::decode(bytes.iter().as_slice());
70+
return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice());
7071
}
7172
todo!()
7273
}
7374
}
74-
75-
impl RemotingSerializable for RegisterBrokerBody {
76-
type Output = RegisterBrokerBody;
77-
78-
/* fn decode(bytes: &[u8]) -> Self::Output {
79-
serde_json::from_slice::<Self::Output>(bytes).unwrap()
80-
}
81-
82-
fn encode(&self, _compress: bool) -> Vec<u8> {
83-
todo!()
84-
}*/
85-
}

rocketmq-remoting/src/protocol/body/get_consumer_listby_group_response_body.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,8 @@
1717
use serde::Deserialize;
1818
use serde::Serialize;
1919

20-
use crate::protocol::RemotingSerializable;
21-
2220
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2321
#[serde(rename_all = "camelCase")]
2422
pub struct GetConsumerListByGroupResponseBody {
2523
pub consumer_id_list: Vec<String>,
2624
}
27-
28-
impl RemotingSerializable for GetConsumerListByGroupResponseBody {
29-
type Output = ();
30-
}

rocketmq-remoting/src/protocol/body/kv_table.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,7 @@ use std::collections::HashMap;
2020
use serde::Deserialize;
2121
use serde::Serialize;
2222

23-
use crate::protocol::RemotingSerializable;
24-
2523
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
2624
pub struct KVTable {
2725
pub table: HashMap<String, String>,
2826
}
29-
30-
impl RemotingSerializable for KVTable {
31-
type Output = Self;
32-
}

rocketmq-remoting/src/protocol/body/topic/topic_list.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,13 @@
1717
use serde::Deserialize;
1818
use serde::Serialize;
1919

20-
use crate::protocol::RemotingSerializable;
21-
2220
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2321
#[serde(rename_all = "camelCase")]
2422
pub struct TopicList {
2523
pub topic_list: Vec<String>,
2624
pub broker_addr: Option<String>,
2725
}
2826

29-
impl RemotingSerializable for TopicList {
30-
type Output = TopicList;
31-
}
32-
3327
#[cfg(test)]
3428
mod tests {
3529
use super::*;

rocketmq-remoting/src/protocol/body/topic_info_wrapper.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use serde::Deserialize;
2222
use serde::Serialize;
2323

2424
use crate::protocol::DataVersion;
25-
use crate::protocol::RemotingSerializable;
2625

2726
pub mod topic_config_wrapper;
2827
pub mod topic_queue_wrapper;
@@ -74,7 +73,3 @@ impl TopicConfigSerializeWrapper {
7473
self.data_version = data_version;
7574
}
7675
}
77-
78-
impl RemotingSerializable for TopicConfigSerializeWrapper {
79-
type Output = Self;
80-
}

rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use serde::Serialize;
2323
use crate::protocol::static_topic::topic_queue_info::TopicQueueMappingInfo;
2424
use crate::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
2525
use crate::protocol::DataVersion;
26-
use crate::protocol::RemotingSerializable;
2726

2827
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2928
pub struct TopicConfigAndMappingSerializeWrapper {
@@ -75,10 +74,6 @@ impl Default for TopicConfigAndMappingSerializeWrapper {
7574
}
7675
}
7776

78-
impl RemotingSerializable for TopicConfigAndMappingSerializeWrapper {
79-
type Output = TopicConfigAndMappingSerializeWrapper;
80-
}
81-
8277
#[cfg(test)]
8378
mod tests {
8479
use std::collections::HashMap;

0 commit comments

Comments
 (0)