Skip to content

Commit 55a9a96

Browse files
authored
[ISSUE #295]Refactor network section code (#296)
1 parent 9bde828 commit 55a9a96

22 files changed

+997
-218
lines changed

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ members = [
66
"rocketmq-filter",
77
"rocketmq-macros",
88
"rocketmq-namesrv",
9-
"rocketmq-remoting",
9+
"rocketmq-remoting", "rocketmq-runtime",
1010
"rocketmq-store"]
1111
resolver = "2"
1212

@@ -51,4 +51,4 @@ num_cpus = "1.16"
5151
config = "0.14"
5252

5353
parking_lot = "0.12"
54-
dirs = "5.0"
54+
dirs = "5.0"

rocketmq-broker/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ rocketmq-remoting = { version = "0.2.0", path = "../rocketmq-remoting" }
2323
rocketmq-store = { version = "0.2.0", path = "../rocketmq-store", default-features = true }
2424
rocketmq-filter = { version = "0.2.0", path = "../rocketmq-filter" }
2525
rocketmq-macros = { version = "0.2.0", path = "../rocketmq-macros" }
26+
rocketmq-runtime = { version = "0.2.0", path = "../rocketmq-runtime" }
2627

2728
anyhow.workspace = true
2829
env_logger.workspace = true

rocketmq-broker/src/bin/broker_bootstrap_server.rs

+26-29
Original file line numberDiff line numberDiff line change
@@ -15,59 +15,56 @@
1515
* limitations under the License.
1616
*/
1717

18-
use std::{path::PathBuf, process::exit};
18+
use std::path::PathBuf;
1919

2020
use clap::Parser;
21-
use rocketmq_broker::{
22-
broker_config::BrokerConfig, broker_controller::BrokerController, command::Args,
23-
};
21+
use rocketmq_broker::{broker_config::BrokerConfig, command::Args, Builder};
2422
use rocketmq_common::{EnvUtils::EnvUtils, ParseConfigFile};
2523
use rocketmq_rust::rocketmq;
2624
use rocketmq_store::config::message_store_config::MessageStoreConfig;
27-
use tracing::{info, warn};
25+
use tracing::info;
2826

2927
#[rocketmq::main]
3028
async fn main() -> anyhow::Result<()> {
3129
// init logger
3230
rocketmq_common::log::init_logger();
33-
let controller = create_broker_controller()?;
34-
start_broker_controller(controller).await?;
31+
let (broker_config, message_store_config) = parse_config_file();
32+
// boot strap broker
33+
Builder::new()
34+
.set_broker_config(broker_config)
35+
.set_message_store_config(message_store_config)
36+
.build()
37+
.boot()
38+
.await;
3539
Ok(())
3640
}
3741

38-
fn create_broker_controller() -> anyhow::Result<BrokerController> {
42+
fn parse_config_file() -> (BrokerConfig, MessageStoreConfig) {
3943
let args = Args::parse();
4044
let home = EnvUtils::get_rocketmq_home();
41-
let (broker_config, message_store_config) = if let Some(ref config_file) = args.config_file {
45+
let config = if let Some(ref config_file) = args.config_file {
4246
let config_file = PathBuf::from(config_file);
4347
(
44-
ParseConfigFile::parse_config_file::<BrokerConfig>(config_file.clone())?,
45-
ParseConfigFile::parse_config_file::<MessageStoreConfig>(config_file.clone())?,
48+
ParseConfigFile::parse_config_file::<BrokerConfig>(config_file.clone())
49+
.ok()
50+
.unwrap(),
51+
ParseConfigFile::parse_config_file::<MessageStoreConfig>(config_file.clone())
52+
.ok()
53+
.unwrap(),
4654
)
4755
} else {
4856
let path_buf = PathBuf::from(home.as_str())
4957
.join("conf")
5058
.join("broker.toml");
5159
(
52-
ParseConfigFile::parse_config_file::<BrokerConfig>(path_buf.clone())?,
53-
ParseConfigFile::parse_config_file::<MessageStoreConfig>(path_buf)?,
60+
ParseConfigFile::parse_config_file::<BrokerConfig>(path_buf.clone())
61+
.ok()
62+
.unwrap(),
63+
ParseConfigFile::parse_config_file::<MessageStoreConfig>(path_buf)
64+
.ok()
65+
.unwrap(),
5466
)
5567
};
5668
info!("Rocketmq(Rust) home: {}", home);
57-
Ok(BrokerController::new(broker_config, message_store_config))
58-
}
59-
60-
async fn start_broker_controller(broker_controller: BrokerController) -> anyhow::Result<()> {
61-
let mut broker_controller = broker_controller;
62-
if !broker_controller.initialize() {
63-
warn!("Rocketmq(Rust) start failed, initialize failed");
64-
exit(0);
65-
}
66-
info!(
67-
"Rocketmq name server(Rust) running on {}:{}",
68-
broker_controller.broker_config.broker_ip1, broker_controller.broker_config.listen_port,
69-
);
70-
//start controller
71-
broker_controller.start().await;
72-
Ok(())
69+
config
7370
}
+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use rocketmq_remoting::server::config::ServerConfig;
19+
use rocketmq_store::config::message_store_config::MessageStoreConfig;
20+
use tokio::select;
21+
use tracing::error;
22+
23+
use crate::{broker_config::BrokerConfig, broker_runtime::BrokerRuntime};
24+
25+
pub struct BrokerBootstrap {
26+
broker_runtime: BrokerRuntime,
27+
}
28+
29+
impl BrokerBootstrap {
30+
pub async fn boot(mut self) {
31+
if !self.initialize().await {
32+
error!("initialize fail");
33+
return;
34+
}
35+
36+
select! {
37+
_ = self.start() =>{
38+
39+
}
40+
}
41+
}
42+
43+
async fn initialize(&mut self) -> bool {
44+
self.broker_runtime.initialize().await
45+
}
46+
47+
async fn start(&mut self) {
48+
self.broker_runtime.start().await;
49+
}
50+
}
51+
52+
pub struct Builder {
53+
broker_config: BrokerConfig,
54+
message_store_config: MessageStoreConfig,
55+
server_config: ServerConfig,
56+
}
57+
58+
impl Builder {
59+
pub fn new() -> Self {
60+
Builder {
61+
broker_config: Default::default(),
62+
message_store_config: MessageStoreConfig::new(),
63+
server_config: Default::default(),
64+
}
65+
}
66+
67+
pub fn set_broker_config(mut self, broker_config: BrokerConfig) -> Self {
68+
self.broker_config = broker_config;
69+
self
70+
}
71+
pub fn set_message_store_config(mut self, message_store_config: MessageStoreConfig) -> Self {
72+
self.message_store_config = message_store_config;
73+
self
74+
}
75+
76+
pub fn set_server_config(mut self, server_config: ServerConfig) -> Self {
77+
self.server_config = server_config;
78+
self
79+
}
80+
81+
pub fn build(self) -> BrokerBootstrap {
82+
BrokerBootstrap {
83+
broker_runtime: BrokerRuntime::new(
84+
self.broker_config,
85+
self.message_store_config,
86+
self.server_config,
87+
),
88+
}
89+
}
90+
}
91+
92+
impl Default for Builder {
93+
fn default() -> Self {
94+
Self::new()
95+
}
96+
}

0 commit comments

Comments
 (0)