Skip to content

[ISSUE #195]🚀Refactor remoting net #196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rocketmq-broker/src/bin/broker_bootstrap_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::{path::PathBuf, process::exit};

use clap::Parser;
use futures_util::join;
use rocketmq_broker::{
broker_config::BrokerConfig, broker_controller::BrokerController, command::Args,
};
Expand Down Expand Up @@ -65,6 +66,7 @@ async fn start_broker_controller(broker_controller: BrokerController) -> anyhow:
"Rocketmq name server(Rust) running on {}:{}",
broker_controller.broker_config.broker_ip1, broker_controller.broker_config.listen_port,
);
broker_controller.start().await;
let x = broker_controller.start();
join!(x);
Ok(())
}
42 changes: 35 additions & 7 deletions rocketmq-broker/src/broker_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};

use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_common::{common::config_manager::ConfigManager, TokioExecutorService};
use rocketmq_remoting::{
remoting::RemotingService, server::rocketmq_server::RocketmqDefaultServer,
code::request_code::RequestCode,
server::{rocketmq_server::RocketmqDefaultServer, RemotingServer},
};
use rocketmq_store::{
base::store_enum::StoreType, config::message_store_config::MessageStoreConfig,
Expand Down Expand Up @@ -104,6 +105,9 @@ pub struct BrokerController {
pub(crate) replicas_manager: Option<ReplicasManager>,
pub(crate) broker_server: Option<RocketmqDefaultServer>,
pub(crate) fast_broker_server: Option<RocketmqDefaultServer>,

//executors
pub(crate) send_message_executor: Arc<TokioExecutorService>,
}

impl BrokerController {
Expand Down Expand Up @@ -148,6 +152,7 @@ impl BrokerController {
replicas_manager: None,
broker_server: None,
fast_broker_server: None,
send_message_executor: Arc::new(Default::default()),
}
}
}
Expand All @@ -166,9 +171,9 @@ impl BrokerController {
replicas_manager.start();
}

if let Some(ref mut broker_server) = self.broker_server {
broker_server.start().await;
}
/* if let Some(ref mut broker_server) = self.broker_server {
//broker_server.start();
};*/

//other service start
}
Expand Down Expand Up @@ -245,7 +250,30 @@ impl BrokerController {

fn initialize_resources(&mut self) {}

fn register_processor(&mut self) {}
fn register_processor(&mut self) {
let broker_server = self.broker_server.as_mut().unwrap();
let send_message_processor = Arc::new(SendMessageProcessor::default());
broker_server.register_processor(
RequestCode::SendMessage,
send_message_processor.clone(),
self.send_message_executor.clone(),
);
broker_server.register_processor(
RequestCode::SendMessageV2,
send_message_processor.clone(),
self.send_message_executor.clone(),
);
broker_server.register_processor(
RequestCode::SendBatchMessage,
send_message_processor.clone(),
self.send_message_executor.clone(),
);
broker_server.register_processor(
RequestCode::ConsumerSendMsgBack,
send_message_processor,
self.send_message_executor.clone(),
);
}

fn initialize_scheduled_tasks(&mut self) {}

Expand Down
7 changes: 3 additions & 4 deletions rocketmq-broker/src/processor/ack_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::net::SocketAddr;

use rocketmq_remoting::{
protocol::remoting_command::RemotingCommand, runtime::processor::RequestProcessor,
protocol::remoting_command::RemotingCommand,
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};

#[derive(Default)]
Expand All @@ -26,7 +25,7 @@ pub struct AckMessageProcessor {}
impl RequestProcessor for AckMessageProcessor {
fn process_request(
&mut self,
_remote_addr: SocketAddr,
_ctx: ConnectionHandlerContext,
_request: RemotingCommand,
) -> RemotingCommand {
todo!()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_remoting::runtime::processor::RequestProcessor;
use rocketmq_remoting::runtime::{processor::RequestProcessor, server::ConnectionHandlerContext};

#[derive(Default)]
pub struct ChangeInvisibleTimeProcessor {}

impl RequestProcessor for ChangeInvisibleTimeProcessor {
fn process_request(
&mut self,
_remote_addr: std::net::SocketAddr,
_ctx: ConnectionHandlerContext,
_request: rocketmq_remoting::protocol::remoting_command::RemotingCommand,
) -> rocketmq_remoting::protocol::remoting_command::RemotingCommand {
todo!()
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-broker/src/processor/notification_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_remoting::runtime::processor::RequestProcessor;
use rocketmq_remoting::runtime::{processor::RequestProcessor, server::ConnectionHandlerContext};

#[derive(Default)]
pub struct NotificationProcessor {}

impl RequestProcessor for NotificationProcessor {
fn process_request(
&mut self,
_remote_addr: std::net::SocketAddr,
_ctx: ConnectionHandlerContext,
_request: rocketmq_remoting::protocol::remoting_command::RemotingCommand,
) -> rocketmq_remoting::protocol::remoting_command::RemotingCommand {
todo!()
Expand Down
7 changes: 3 additions & 4 deletions rocketmq-broker/src/processor/peek_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::net::SocketAddr;

use rocketmq_remoting::{
protocol::remoting_command::RemotingCommand, runtime::processor::RequestProcessor,
protocol::remoting_command::RemotingCommand,
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};

#[derive(Default)]
Expand All @@ -26,7 +25,7 @@ pub struct PeekMessageProcessor {}
impl RequestProcessor for PeekMessageProcessor {
fn process_request(
&mut self,
_remote_addr: SocketAddr,
_ctx: ConnectionHandlerContext,
_request: RemotingCommand,
) -> RemotingCommand {
todo!()
Expand Down
7 changes: 3 additions & 4 deletions rocketmq-broker/src/processor/polling_info_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::net::SocketAddr;

use rocketmq_remoting::{
protocol::remoting_command::RemotingCommand, runtime::processor::RequestProcessor,
protocol::remoting_command::RemotingCommand,
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};

#[derive(Default)]
Expand All @@ -26,7 +25,7 @@ pub struct PollingInfoProcessor {}
impl RequestProcessor for PollingInfoProcessor {
fn process_request(
&mut self,
_remote_addr: SocketAddr,
_ctx: ConnectionHandlerContext,
_request: RemotingCommand,
) -> RemotingCommand {
todo!()
Expand Down
7 changes: 3 additions & 4 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::net::SocketAddr;

use rocketmq_remoting::{
protocol::remoting_command::RemotingCommand, runtime::processor::RequestProcessor,
protocol::remoting_command::RemotingCommand,
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};

#[derive(Default)]
Expand All @@ -26,7 +25,7 @@ pub struct PopMessageProcessor {}
impl RequestProcessor for PopMessageProcessor {
fn process_request(
&mut self,
_remote_addr: SocketAddr,
_ctx: ConnectionHandlerContext,
_request: RemotingCommand,
) -> RemotingCommand {
todo!()
Expand Down
7 changes: 3 additions & 4 deletions rocketmq-broker/src/processor/pull_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::net::SocketAddr;

use rocketmq_remoting::{
protocol::remoting_command::RemotingCommand, runtime::processor::RequestProcessor,
protocol::remoting_command::RemotingCommand,
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};

#[derive(Default)]
Expand All @@ -26,7 +25,7 @@ pub struct PullMessageProcessor {}
impl RequestProcessor for PullMessageProcessor {
fn process_request(
&mut self,
_remote_addr: SocketAddr,
_ctx: ConnectionHandlerContext,
_request: RemotingCommand,
) -> RemotingCommand {
todo!()
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-broker/src/processor/reply_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_remoting::runtime::processor::RequestProcessor;
use rocketmq_remoting::runtime::{processor::RequestProcessor, server::ConnectionHandlerContext};

#[derive(Default)]
pub struct ReplyMessageProcessor {}

impl RequestProcessor for ReplyMessageProcessor {
fn process_request(
&mut self,
_remote_addr: std::net::SocketAddr,
_ctx: ConnectionHandlerContext,
_request: rocketmq_remoting::protocol::remoting_command::RemotingCommand,
) -> rocketmq_remoting::protocol::remoting_command::RemotingCommand {
todo!()
Expand Down
7 changes: 3 additions & 4 deletions rocketmq-broker/src/processor/send_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::net::SocketAddr;

use rocketmq_remoting::{
protocol::remoting_command::RemotingCommand, runtime::processor::RequestProcessor,
protocol::remoting_command::RemotingCommand,
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};

#[derive(Default)]
Expand All @@ -26,7 +25,7 @@ pub struct SendMessageProcessor {}
impl RequestProcessor for SendMessageProcessor {
fn process_request(
&mut self,
_remote_addr: SocketAddr,
_ctx: ConnectionHandlerContext,
_request: RemotingCommand,
) -> RemotingCommand {
todo!()
Expand Down
26 changes: 19 additions & 7 deletions rocketmq-common/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@

use std::fmt;

pub use faq::FAQUrl;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

pub use crate::common::sys_flag::topic_sys_flag as TopicSysFlag;

pub mod attribute;
pub mod boundary_type;
pub mod broker;
pub mod config;
pub mod config_manager;
pub mod constant;
mod faq;
pub use faq::FAQUrl;
pub mod message;
pub mod mix_all;
pub mod mq_version;
pub mod namesrv;
mod sys_flag;
pub use crate::common::sys_flag::topic_sys_flag as TopicSysFlag;
pub mod attribute;
pub mod boundary_type;
pub mod broker;
pub mod config_manager;
pub mod message;
pub mod topic;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -86,3 +87,14 @@ impl<'de> Deserialize<'de> for TopicFilterType {
deserializer.deserialize_str(TopicFilterTypeVisitor)
}
}

pub struct Pair<T, U> {
pub left: T,
pub right: U,
}

impl<T, U> Pair<T, U> {
pub fn new(left: T, right: U) -> Self {
Self { left, right }
}
}
2 changes: 1 addition & 1 deletion rocketmq-common/src/common/broker/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use serde::Deserialize;

use crate::common::{mix_all, topic::TopicValidator};
use crate::common::mix_all;

#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down
13 changes: 7 additions & 6 deletions rocketmq-common/src/common/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
* limitations under the License.
*/

use std::{
collections::{HashMap, HashSet},
string::ToString,
};

use lazy_static::lazy_static;

pub mod message_batch;
pub mod message_id;
pub mod message_queue;
pub mod message_single;

use std::collections::HashMap;

pub trait MessageTrait {
fn get_topic(&self) -> &str;

Expand Down Expand Up @@ -99,10 +104,6 @@ impl MessageVersion {
}
}

use std::{collections::HashSet, string::ToString};

use lazy_static::lazy_static;

pub struct MessageConst;

impl MessageConst {
Expand Down
17 changes: 10 additions & 7 deletions rocketmq-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
#![allow(dead_code)]
#![allow(unused_imports)]

pub use crate::thread_pool::{
FuturesExecutorService, FuturesExecutorServiceBuilder, ScheduledExecutorService,
TokioExecutorService,
pub use crate::{
thread_pool::{
FuturesExecutorService, FuturesExecutorServiceBuilder, ScheduledExecutorService,
TokioExecutorService,
},
utils::{
crc32_utils as CRC32Utils, env_utils as EnvUtils, file_utils as FileUtils,
parse_config_file as ParseConfigFile, time_utils as TimeUtils,
},
};

pub mod common;
pub mod log;
mod thread_pool;
pub mod utils;
pub use crate::utils::{
crc32_utils as CRC32Utils, env_utils as EnvUtils, file_utils as FileUtils,
parse_config_file as ParseConfigFile, time_utils as TimeUtils,
};

#[cfg(test)]
mod tests {}
Loading