Skip to content

Commit 93ecdf4

Browse files
authored
[ISSUE #191]🚀Implementing network communication for the Broker-3 (#192)
1 parent c3bbaa7 commit 93ecdf4

File tree

16 files changed

+295
-53
lines changed

16 files changed

+295
-53
lines changed

rocketmq-broker/src/bin/broker_bootstrap_server.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
use std::{path::PathBuf, process::exit};
1919

2020
use clap::Parser;
21-
use rocketmq_broker::{broker_controller::BrokerController, command::Args};
22-
use rocketmq_common::{
23-
common::broker::broker_config::BrokerConfig, EnvUtils::EnvUtils, ParseConfigFile,
21+
use rocketmq_broker::{
22+
broker_config::BrokerConfig, broker_controller::BrokerController, command::Args,
2423
};
24+
use rocketmq_common::{EnvUtils::EnvUtils, ParseConfigFile};
2525
use rocketmq_rust::rocketmq;
2626
use rocketmq_store::config::message_store_config::MessageStoreConfig;
2727
use tracing::info;

rocketmq-broker/src/broker_config.rs

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use rocketmq_common::common::{
19+
broker::broker_config::{BrokerIdentity, TimerWheelConfig, TopicConfig, TopicQueueConfig},
20+
topic::TopicValidator,
21+
};
22+
use rocketmq_remoting::server::config::BrokerServerConfig;
23+
use serde::Deserialize;
24+
25+
#[derive(Debug, Deserialize)]
26+
#[serde(rename_all = "camelCase")]
27+
pub struct BrokerConfig {
28+
pub broker_identity: BrokerIdentity,
29+
30+
pub topic_config: TopicConfig,
31+
32+
pub topic_queue_config: TopicQueueConfig,
33+
34+
pub timer_wheel_config: TimerWheelConfig,
35+
36+
pub broker_server_config: BrokerServerConfig,
37+
38+
pub broker_ip1: String,
39+
pub broker_ip2: Option<String>,
40+
pub listen_port: u32,
41+
pub trace_topic_enable: bool,
42+
pub msg_trace_topic_name: String,
43+
pub enable_controller_mode: bool,
44+
}
45+
46+
impl Default for BrokerConfig {
47+
fn default() -> Self {
48+
let broker_identity = BrokerIdentity::new();
49+
let broker_ip1 = String::from("127.0.0.1");
50+
let broker_ip2 = None;
51+
let listen_port = 10911;
52+
53+
BrokerConfig {
54+
broker_identity,
55+
topic_config: TopicConfig::default(),
56+
topic_queue_config: TopicQueueConfig::default(),
57+
timer_wheel_config: TimerWheelConfig::default(),
58+
broker_server_config: Default::default(),
59+
broker_ip1,
60+
broker_ip2,
61+
listen_port,
62+
trace_topic_enable: false,
63+
msg_trace_topic_name: TopicValidator::RMQ_SYS_TRACE_TOPIC.to_string(),
64+
enable_controller_mode: false,
65+
}
66+
}
67+
}

rocketmq-broker/src/broker_controller.rs

+38-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
*/
1717
use std::net::SocketAddr;
1818

19-
use rocketmq_common::common::{broker::broker_config::BrokerConfig, config_manager::ConfigManager};
19+
use rocketmq_common::common::config_manager::ConfigManager;
20+
use rocketmq_remoting::{
21+
remoting::RemotingService, server::rocketmq_server::RocketmqDefaultServer,
22+
};
2023
use rocketmq_store::{
2124
base::store_enum::StoreType, config::message_store_config::MessageStoreConfig,
2225
log_file::MessageStore, message_store::local_file_store::LocalFileMessageStore,
@@ -26,6 +29,7 @@ use rocketmq_store::{
2629
use tracing::{info, warn};
2730

2831
use crate::{
32+
broker_config::BrokerConfig,
2933
broker_outer_api::BrokerOuterAPI,
3034
client::{
3135
default_consumer_ids_change_listener::DefaultConsumerIdsChangeListener,
@@ -36,6 +40,7 @@ use crate::{
3640
cold_data_cg_ctr_service::ColdDataCgCtrService,
3741
cold_data_pull_request_hold_service::ColdDataPullRequestHoldService,
3842
},
43+
controller::replicas_manager::ReplicasManager,
3944
filter::manager::consumer_filter_manager::ConsumerFilterManager,
4045
longpolling::{
4146
longpolling_service::pull_request_hold_service::PullRequestHoldService,
@@ -96,6 +101,9 @@ pub struct BrokerController {
96101
pub(crate) broker_outer_api: BrokerOuterAPI,
97102
pub(crate) message_store: Option<Box<dyn MessageStore>>,
98103
pub(crate) timer_message_store: Option<TimerMessageStore>,
104+
pub(crate) replicas_manager: Option<ReplicasManager>,
105+
pub(crate) broker_server: Option<RocketmqDefaultServer>,
106+
pub(crate) fast_broker_server: Option<RocketmqDefaultServer>,
99107
}
100108

101109
impl BrokerController {
@@ -137,12 +145,33 @@ impl BrokerController {
137145
broker_outer_api: Default::default(),
138146
message_store: None,
139147
timer_message_store: None,
148+
replicas_manager: None,
149+
broker_server: None,
150+
fast_broker_server: None,
140151
}
141152
}
142153
}
143154

144155
impl BrokerController {
145-
pub async fn start(&mut self) {}
156+
pub async fn start(&mut self) {
157+
if self.message_store.is_some() {
158+
let _ = self.message_store.as_mut().unwrap().start();
159+
}
160+
161+
if let Some(ref mut timer_message_store) = self.timer_message_store {
162+
timer_message_store.start();
163+
}
164+
165+
if let Some(ref mut replicas_manager) = self.replicas_manager {
166+
replicas_manager.start();
167+
}
168+
169+
if let Some(ref mut broker_server) = self.broker_server {
170+
broker_server.start().await;
171+
}
172+
173+
//other service start
174+
}
146175

147176
pub fn initialize(&mut self) -> bool {
148177
let mut result = self.initialize_metadata();
@@ -206,7 +235,13 @@ impl BrokerController {
206235
result
207236
}
208237

209-
fn initialize_remoting_server(&mut self) {}
238+
fn initialize_remoting_server(&mut self) {
239+
let broker_server =
240+
RocketmqDefaultServer::new(self.broker_config.broker_server_config.clone());
241+
self.broker_server = Some(broker_server);
242+
243+
// fast broker server implementation in future versions
244+
}
210245

211246
fn initialize_resources(&mut self) {}
212247

rocketmq-broker/src/controller.rs

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
pub(crate) mod replicas_manager;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#[derive(Default)]
18+
pub struct ReplicasManager {}
19+
20+
impl ReplicasManager {
21+
pub fn start(&mut self) {}
22+
}

rocketmq-broker/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
pub mod broker_controller;
2020
pub mod command;
2121

22+
pub mod broker_config;
2223
mod broker_outer_api;
2324
mod client;
2425
mod coldctr;
26+
mod controller;
2527
mod filter;
2628
mod longpolling;
2729
mod offset;

rocketmq-broker/src/topic/manager/topic_config_manager.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
use std::{collections::HashMap, sync::Arc};
1919

2020
use rocketmq_common::common::{
21-
broker::broker_config::BrokerConfig, config::TopicConfig, config_manager::ConfigManager,
22-
constant::PermName, mix_all, topic::TopicValidator,
21+
config::TopicConfig, config_manager::ConfigManager, constant::PermName, mix_all,
22+
topic::TopicValidator,
2323
};
2424
use rocketmq_remoting::protocol::DataVersion;
2525

26+
use crate::broker_config::BrokerConfig;
27+
2628
#[derive(Default)]
2729
pub(crate) struct TopicConfigManager {
2830
pub consumer_order_info_manager: String,

rocketmq-common/src/common/broker/broker_config.rs

+1-42
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub struct BrokerIdentity {
3030
}
3131

3232
impl BrokerIdentity {
33-
fn new() -> Self {
33+
pub fn new() -> Self {
3434
let broker_name = default_broker_name();
3535
let broker_cluster_name = String::from("DefaultCluster");
3636
let broker_id = mix_all::MASTER_ID;
@@ -86,47 +86,6 @@ fn default_broker_name() -> String {
8686
String::from("DefaultBrokerName")
8787
}
8888

89-
#[derive(Debug, Deserialize)]
90-
#[serde(rename_all = "camelCase")]
91-
pub struct BrokerConfig {
92-
pub broker_identity: BrokerIdentity,
93-
94-
pub topic_config: TopicConfig,
95-
96-
pub topic_queue_config: TopicQueueConfig,
97-
98-
pub timer_wheel_config: TimerWheelConfig,
99-
100-
pub broker_ip1: String,
101-
pub broker_ip2: Option<String>,
102-
pub listen_port: u32,
103-
pub trace_topic_enable: bool,
104-
pub msg_trace_topic_name: String,
105-
pub enable_controller_mode: bool,
106-
}
107-
108-
impl Default for BrokerConfig {
109-
fn default() -> Self {
110-
let broker_identity = BrokerIdentity::new();
111-
let broker_ip1 = String::from("127.0.0.1");
112-
let broker_ip2 = None;
113-
let listen_port = 10911;
114-
115-
BrokerConfig {
116-
broker_identity,
117-
topic_config: TopicConfig::default(),
118-
topic_queue_config: TopicQueueConfig::default(),
119-
timer_wheel_config: TimerWheelConfig::default(),
120-
broker_ip1,
121-
broker_ip2,
122-
listen_port,
123-
trace_topic_enable: false,
124-
msg_trace_topic_name: TopicValidator::RMQ_SYS_TRACE_TOPIC.to_string(),
125-
enable_controller_mode: false,
126-
}
127-
}
128-
}
129-
13089
#[derive(Debug, Deserialize)]
13190
#[serde(rename_all = "camelCase")]
13291
pub struct TopicConfig {

rocketmq-remoting/src/runtime.rs

+2
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ pub trait RPCHook {
3131
response: &RemotingCommand,
3232
);
3333
}
34+
35+
pub struct ServerInner {}

rocketmq-remoting/src/server.rs

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
pub mod config;
1819
pub mod rocketmq_server;
1920

2021
use std::sync::Arc;
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use serde::{Deserialize, Serialize};
18+
19+
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
20+
#[serde(rename_all = "camelCase")]
21+
pub struct BrokerServerConfig {}

0 commit comments

Comments
 (0)