Skip to content

Commit 6cb2aa1

Browse files
authored
[ISSUE #169]🚀Implementing network communication for the Broker (#205)
* [ISSUE #169]🚀Implementing network communication for the Broker * add test case
1 parent 8a95a97 commit 6cb2aa1

File tree

18 files changed

+700
-23
lines changed

18 files changed

+700
-23
lines changed

rocketmq-broker/src/bin/broker_bootstrap_server.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ async fn start_broker_controller(broker_controller: BrokerController) -> anyhow:
6767
"Rocketmq name server(Rust) running on {}:{}",
6868
broker_controller.broker_config.broker_ip1, broker_controller.broker_config.listen_port,
6969
);
70-
let future = broker_controller.start();
71-
join!(future);
70+
join!(broker_controller.start());
7271
Ok(())
7372
}

rocketmq-broker/src/broker_controller.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl BrokerController {
174174

175175
if let Some(ref mut broker_server) = self.broker_server {
176176
broker_server.start().await;
177-
};
177+
}
178178

179179
//other service start
180180
}

rocketmq-broker/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod coldctr;
2626
mod controller;
2727
mod filter;
2828
mod longpolling;
29+
mod mqtrace;
2930
mod offset;
3031
mod processor;
3132
mod schedule;

rocketmq-broker/src/mqtrace.rs

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
pub(crate) mod send_message_context;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::{any::Any, collections::HashMap};
18+
19+
use rocketmq_common::common::message::message_enum::MessageType;
20+
use rocketmq_store::status::StatsType;
21+
22+
#[derive(Debug, Default)]
23+
pub struct SendMessageContext {
24+
pub namespace: String,
25+
pub producer_group: String,
26+
pub topic: String,
27+
pub msg_id: String,
28+
pub origin_msg_id: String,
29+
pub queue_id: Option<i32>,
30+
pub queue_offset: Option<i64>,
31+
pub broker_addr: String,
32+
pub born_host: String,
33+
pub body_length: i32,
34+
pub code: i32,
35+
pub error_msg: String,
36+
pub msg_props: String,
37+
pub mq_trace_context: Option<Box<dyn Any>>,
38+
pub ext_props: HashMap<String, String>,
39+
pub broker_region_id: String,
40+
pub msg_unique_key: String,
41+
pub born_time_stamp: i64,
42+
pub request_time_stamp: i64,
43+
pub msg_type: MessageType,
44+
pub is_success: bool,
45+
pub account_auth_type: String,
46+
pub account_owner_parent: String,
47+
pub account_owner_self: String,
48+
pub send_msg_num: i32,
49+
pub send_msg_size: i32,
50+
pub send_stat: StatsType,
51+
pub commercial_send_msg_num: i32,
52+
pub commercial_owner: String,
53+
pub commercial_send_stats: StatsType,
54+
pub commercial_send_size: i32,
55+
pub commercial_send_times: i32,
56+
}

rocketmq-broker/src/processor/send_message_processor.rs

+30-4
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,45 @@
1515
* limitations under the License.
1616
*/
1717
use rocketmq_remoting::{
18-
protocol::remoting_command::RemotingCommand,
18+
code::request_code::RequestCode,
19+
protocol::{
20+
header::message_operation_header::send_message_request_header::parse_request_header,
21+
remoting_command::RemotingCommand,
22+
},
1923
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
2024
};
2125

2226
#[derive(Default)]
23-
pub struct SendMessageProcessor {}
27+
pub struct SendMessageProcessor {
28+
inner: SendMessageProcessorInner,
29+
}
2430

2531
impl RequestProcessor for SendMessageProcessor {
2632
fn process_request(
2733
&mut self,
28-
_ctx: ConnectionHandlerContext,
29-
_request: RemotingCommand,
34+
ctx: ConnectionHandlerContext,
35+
request: RemotingCommand,
3036
) -> RemotingCommand {
37+
let request_code = RequestCode::from(request.code());
38+
match request_code {
39+
RequestCode::ConsumerSendMsgBack => self.inner.consumer_send_msg_back(&ctx, &request),
40+
_ => {
41+
let _request_header = parse_request_header(&request).unwrap();
42+
}
43+
}
44+
RemotingCommand::create_response_command()
45+
}
46+
}
47+
48+
#[derive(Default)]
49+
struct SendMessageProcessorInner {}
50+
51+
impl SendMessageProcessorInner {
52+
fn consumer_send_msg_back(
53+
&mut self,
54+
_ctx: &ConnectionHandlerContext,
55+
_request: &RemotingCommand,
56+
) {
3157
todo!()
3258
}
3359
}

rocketmq-common/src/common/message.rs

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::{
2323
use lazy_static::lazy_static;
2424

2525
pub mod message_batch;
26+
pub mod message_enum;
2627
pub mod message_id;
2728
pub mod message_queue;
2829
pub mod message_single;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#[derive(Debug, PartialEq, Copy, Clone, Default)]
18+
pub enum MessageType {
19+
#[default]
20+
NormalMsg,
21+
TransMsgHalf,
22+
TransMsgCommit,
23+
DelayMsg,
24+
OrderMsg,
25+
}
26+
27+
impl MessageType {
28+
pub fn get_short_name(&self) -> &'static str {
29+
match self {
30+
MessageType::NormalMsg => "Normal",
31+
MessageType::TransMsgHalf => "Trans",
32+
MessageType::TransMsgCommit => "TransCommit",
33+
MessageType::DelayMsg => "Delay",
34+
MessageType::OrderMsg => "Order",
35+
}
36+
}
37+
38+
pub fn get_by_short_name(short_name: &str) -> MessageType {
39+
match short_name {
40+
"Normal" => MessageType::NormalMsg,
41+
"Trans" => MessageType::TransMsgHalf,
42+
"TransCommit" => MessageType::TransMsgCommit,
43+
"Delay" => MessageType::DelayMsg,
44+
"Order" => MessageType::OrderMsg,
45+
_ => MessageType::NormalMsg,
46+
}
47+
}
48+
}
49+
50+
#[derive(Debug, PartialEq, Copy, Clone)]
51+
pub enum MessageRequestMode {
52+
Pull,
53+
Pop,
54+
}
55+
56+
impl MessageRequestMode {
57+
pub fn get_name(&self) -> &'static str {
58+
match self {
59+
MessageRequestMode::Pull => "PULL",
60+
MessageRequestMode::Pop => "POP",
61+
}
62+
}
63+
}
64+
65+
#[cfg(test)]
66+
mod tests {
67+
use super::*;
68+
69+
#[test]
70+
fn test_get_short_name() {
71+
assert_eq!(MessageType::NormalMsg.get_short_name(), "Normal");
72+
assert_eq!(MessageType::TransMsgHalf.get_short_name(), "Trans");
73+
assert_eq!(MessageType::TransMsgCommit.get_short_name(), "TransCommit");
74+
assert_eq!(MessageType::DelayMsg.get_short_name(), "Delay");
75+
assert_eq!(MessageType::OrderMsg.get_short_name(), "Order");
76+
}
77+
78+
#[test]
79+
fn test_get_by_short_name() {
80+
assert_eq!(
81+
MessageType::get_by_short_name("Normal"),
82+
MessageType::NormalMsg
83+
);
84+
assert_eq!(
85+
MessageType::get_by_short_name("Trans"),
86+
MessageType::TransMsgHalf
87+
);
88+
assert_eq!(
89+
MessageType::get_by_short_name("TransCommit"),
90+
MessageType::TransMsgCommit
91+
);
92+
assert_eq!(
93+
MessageType::get_by_short_name("Delay"),
94+
MessageType::DelayMsg
95+
);
96+
assert_eq!(
97+
MessageType::get_by_short_name("Order"),
98+
MessageType::OrderMsg
99+
);
100+
assert_eq!(
101+
MessageType::get_by_short_name("Invalid"),
102+
MessageType::NormalMsg
103+
);
104+
}
105+
106+
#[test]
107+
fn test_get_name() {
108+
assert_eq!(MessageRequestMode::Pull.get_name(), "PULL");
109+
assert_eq!(MessageRequestMode::Pop.get_name(), "POP");
110+
}
111+
}

rocketmq-remoting/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#![feature(future_join)]
12
/*
23
* Licensed to the Apache Software Foundation (ASF) under one or more
34
* contributor license agreements. See the NOTICE file distributed with
@@ -23,6 +24,7 @@ pub mod error;
2324
pub mod net;
2425
pub mod protocol;
2526
pub mod remoting;
27+
pub mod rpc;
2628
pub mod runtime;
2729
pub mod server;
2830

rocketmq-remoting/src/protocol/header.rs

+1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@
1616
*/
1717
pub mod broker;
1818
pub mod client_request_header;
19+
pub mod message_operation_header;
1920
pub mod namesrv;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
pub mod send_message_request_header;

0 commit comments

Comments
 (0)