diff --git a/rocketmq-namesrv/src/bin/bootstrap_server.rs b/rocketmq-namesrv/src/bin/bootstrap_server.rs index b6f32b1d..d3160dc5 100644 --- a/rocketmq-namesrv/src/bin/bootstrap_server.rs +++ b/rocketmq-namesrv/src/bin/bootstrap_server.rs @@ -31,7 +31,7 @@ use rocketmq_remoting::{ code::request_code::RequestCode, runtime::{processor::RequestProcessor, server}, }; -use tokio::{net::TcpListener, sync::broadcast}; +use tokio::{net::TcpListener, sync::broadcast, task::JoinHandle}; use tracing::info; #[rocketmq::main] @@ -58,12 +58,12 @@ async fn main() -> anyhow::Result<()> { let listener = TcpListener::bind(&format!("{}:{}", args.ip, args.port)).await?; let config_file = PathBuf::from(home).join("conf").join("namesrv.conf"); let config = parse_command_and_config_file(config_file)?; - let (notify_conn_disconnect, _) = broadcast::channel::(1); - let route_info_manager = - RouteInfoManager::new_with_config(config.clone(), Some(notify_conn_disconnect.subscribe())); + let (notify_conn_disconnect, _) = broadcast::channel::(100); + let receiver = notify_conn_disconnect.subscribe(); + let route_info_manager = RouteInfoManager::new_with_config(config.clone()); let kvconfig_manager = KVConfigManager::new(config.clone()); - let (processor_table, default_request_processor, scheduled_executor_service) = - init_processors(route_info_manager, config, kvconfig_manager); + let (processor_table, default_request_processor, scheduled_executor_service, _handle) = + init_processors(route_info_manager, config, kvconfig_manager, receiver); //run server server::run( listener, @@ -77,16 +77,21 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +type InitProcessorsReturn = ( + HashMap>, + DefaultRequestProcessor, + ScheduledExecutorService, + JoinHandle<()>, +); + fn init_processors( route_info_manager: RouteInfoManager, namesrv_config: NamesrvConfig, kvconfig_manager: KVConfigManager, -) -> ( - HashMap>, - DefaultRequestProcessor, - ScheduledExecutorService, -) { + receiver: broadcast::Receiver, +) -> InitProcessorsReturn { let route_info_manager_inner = Arc::new(parking_lot::RwLock::new(route_info_manager)); + let handle = RouteInfoManager::start(route_info_manager_inner.clone(), receiver); let kvconfig_manager_inner = Arc::new(parking_lot::RwLock::new(kvconfig_manager)); let mut processors: HashMap> = HashMap::new(); @@ -114,6 +119,7 @@ fn init_processors( processors, DefaultRequestProcessor::new(namesrv_config), scheduled_executor_service, + handle, ) } diff --git a/rocketmq-namesrv/src/route/route_info_manager.rs b/rocketmq-namesrv/src/route/route_info_manager.rs index c41bc2bf..002d0b40 100644 --- a/rocketmq-namesrv/src/route/route_info_manager.rs +++ b/rocketmq-namesrv/src/route/route_info_manager.rs @@ -18,6 +18,7 @@ use std::{ collections::{HashMap, HashSet}, net::SocketAddr, + sync::Arc, time::{Duration, SystemTime}, }; @@ -48,7 +49,7 @@ use rocketmq_remoting::{ DataVersion, }, }; -use tokio::sync::broadcast; +use tokio::{sync::broadcast, task::JoinHandle}; use tracing::{debug, info, warn}; use crate::route_info::broker_addr_info::{BrokerAddrInfo, BrokerLiveInfo, BrokerStatusChangeInfo}; @@ -75,7 +76,6 @@ pub struct RouteInfoManager { pub(crate) topic_queue_mapping_info_table: TopicQueueMappingInfoTable, pub(crate) namesrv_config: NamesrvConfig, pub(crate) remote_client: RemoteClient, - pub(crate) connect_disconnected_rx: Option>, } #[allow(private_interfaces)] @@ -84,10 +84,7 @@ impl RouteInfoManager { Self::default() } - pub fn new_with_config( - namesrv_config: NamesrvConfig, - connect_disconnected_rx: Option>, - ) -> Self { + pub fn new_with_config(namesrv_config: NamesrvConfig) -> Self { RouteInfoManager { topic_queue_table: HashMap::new(), broker_addr_table: HashMap::new(), @@ -97,7 +94,6 @@ impl RouteInfoManager { topic_queue_mapping_info_table: HashMap::new(), namesrv_config, remote_client: RemoteClient::new(), - connect_disconnected_rx, } } } @@ -1083,4 +1079,44 @@ impl RouteInfoManager { .unwrap() > 0 } + + pub fn connection_disconnected(&mut self, socket_addr: SocketAddr) { + let mut broker_addr_info = None; + for (bai, bli) in &self.broker_live_table { + if bli.remote_addr == socket_addr { + broker_addr_info = Some(bai.clone()); + break; + } + } + if let Some(bai) = broker_addr_info { + let mut request_header = UnRegisterBrokerRequestHeader::default(); + let need_un_register = self.setup_un_register_request(&mut request_header, &bai); + if need_un_register { + self.un_register_broker(vec![request_header]); + } + } + } +} + +// Non-instance method implementations +impl RouteInfoManager { + /// start client connection disconnected listener + pub fn start( + route_info_manager: Arc>, + receiver: broadcast::Receiver, + ) -> JoinHandle<()> { + let mut receiver = receiver; + tokio::spawn(async move { + loop { + match receiver.recv().await { + Ok(socket_addr) => { + route_info_manager + .write() + .connection_disconnected(socket_addr); + } + Err(_err) => {} + } + } + }) + } } diff --git a/rocketmq-remoting/src/runtime/server.rs b/rocketmq-remoting/src/runtime/server.rs index c4f35395..26f5fd2b 100644 --- a/rocketmq-remoting/src/runtime/server.rs +++ b/rocketmq-remoting/src/runtime/server.rs @@ -51,6 +51,19 @@ pub struct ConnectionHandler { conn_disconnect_notify: Option>, } +impl Drop for ConnectionHandler { + fn drop(&mut self) { + if let Some(ref sender) = self.conn_disconnect_notify { + let socket_addr = self.connection.remote_addr; + warn!( + "connection[{}] disconnected, Send notify message.", + socket_addr + ); + let _ = sender.send(socket_addr); + } + } +} + impl ConnectionHandler { async fn handle(&mut self) -> anyhow::Result<()> { let remote_addr = self.connection.remote_addr; @@ -161,9 +174,9 @@ impl ConnectionListener { "The client[IP={}] disconnected from the server.", remote_addr ); - if let Some(ref sender) = handler.conn_disconnect_notify { + /* if let Some(ref sender) = handler.conn_disconnect_notify { let _ = sender.send(remote_addr); - } + }*/ drop(permit); drop(handler); });