diff --git a/Cargo.lock b/Cargo.lock index 229a0d00..d0be7fbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -427,6 +427,18 @@ dependencies = [ "const-random", ] +[[package]] +name = "dns-lookup" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" +dependencies = [ + "cfg-if", + "libc", + "socket2", + "windows-sys 0.48.0", +] + [[package]] name = "downcast" version = "0.11.0" @@ -1263,6 +1275,7 @@ dependencies = [ "clap", "config", "dirs", + "dns-lookup", "env_logger", "futures", "futures-core", @@ -1270,7 +1283,6 @@ dependencies = [ "futures-sink", "futures-util", "local-ip-address", - "log", "mockall", "parking_lot", "rand", @@ -1402,6 +1414,7 @@ dependencies = [ "rand", "rocketmq-common", "rocketmq-macros", + "rocketmq-runtime", "serde", "serde_json", "thiserror", diff --git a/rocketmq-broker/Cargo.toml b/rocketmq-broker/Cargo.toml index 1d877601..5ff4775d 100644 --- a/rocketmq-broker/Cargo.toml +++ b/rocketmq-broker/Cargo.toml @@ -55,7 +55,7 @@ rand = "0.8.5" dirs.workspace = true local-ip-address = "0.6.1" -log = "0.4.21" +dns-lookup = "2.0" [dev-dependencies] mockall = "0.12.1" diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index e5e39031..b1008d4b 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -450,6 +450,30 @@ impl BrokerRuntime { if self.broker_config.enable_controller_mode { self.update_master_haserver_addr_periodically = true; } + + if let Some(ref namesrv_address) = self.broker_config.namesrv_addr.clone() { + self.update_namesrv_addr(); + info!( + "Set user specified name server address: {}", + namesrv_address + ); + let mut broker_runtime = self.clone(); + self.broker_runtime + .as_ref() + .unwrap() + .get_handle() + .spawn(async move { + tokio::time::sleep(Duration::from_secs(10)).await; + loop { + let current_execution_time = tokio::time::Instant::now(); + broker_runtime.update_namesrv_addr(); + let next_execution_time = current_execution_time + Duration::from_secs(60); + let delay = next_execution_time + .saturating_duration_since(tokio::time::Instant::now()); + tokio::time::sleep(delay).await; + } + }); + } } fn initial_transaction(&mut self) {} @@ -475,6 +499,18 @@ impl BrokerRuntime { tokio::spawn(async move { server.run(request_processor).await }); } + fn update_namesrv_addr(&mut self) { + if self.broker_config.fetch_name_srv_addr_by_dns_lookup { + if let Some(namesrv_addr) = &self.broker_config.namesrv_addr { + self.broker_out_api + .update_name_server_address_list_by_dns_lookup(namesrv_addr.clone()); + } + } else if let Some(namesrv_addr) = &self.broker_config.namesrv_addr { + self.broker_out_api + .update_name_server_address_list(namesrv_addr.clone()); + } + } + pub async fn start(&mut self) { self.should_start_time.store( (get_current_millis() as i64 + self.message_store_config.disappear_time_after_start) @@ -572,7 +608,7 @@ impl BrokerRuntime { tokio::time::sleep(delay).await; } }); - log::info!( + info!( "Rocketmq Broker({} ----Rust) start success", self.broker_config.broker_identity.broker_name ); diff --git a/rocketmq-broker/src/out_api/broker_outer_api.rs b/rocketmq-broker/src/out_api/broker_outer_api.rs index 67882a24..12c2a98b 100644 --- a/rocketmq-broker/src/out_api/broker_outer_api.rs +++ b/rocketmq-broker/src/out_api/broker_outer_api.rs @@ -16,6 +16,7 @@ */ use std::sync::Arc; +use dns_lookup::lookup_host; use rocketmq_common::common::broker::broker_config::BrokerIdentity; use rocketmq_common::common::config::TopicConfig; use rocketmq_common::utils::crc32_utils; @@ -34,6 +35,8 @@ use rocketmq_remoting::protocol::RemotingSerializable; use rocketmq_remoting::remoting::RemotingService; use rocketmq_remoting::runtime::config::client_config::TokioClientConfig; use rocketmq_remoting::runtime::RPCHook; +use tracing::error; +use tracing::info; #[derive(Clone)] pub struct BrokerOuterAPI { @@ -99,6 +102,12 @@ impl BrokerOuterAPI { .update_name_server_address_list(addr_vec) } + pub fn update_name_server_address_list_by_dns_lookup(&self, domain: String) { + let address_list = dns_lookup_address_by_domain(domain.as_str()); + self.remoting_client + .update_name_server_address_list(address_list); + } + pub async fn register_broker_all( &self, cluster_name: String, @@ -225,3 +234,62 @@ impl BrokerOuterAPI { pub fn refresh_metadata(&self) {} } + +fn dns_lookup_address_by_domain(domain: &str) -> Vec { + let mut address_list = Vec::new(); + // Ensure logging is initialized + + match domain.find(':') { + Some(index) => { + let (domain_str, port_str) = domain.split_at(index); + match lookup_host(domain_str) { + Ok(addresses) => { + for address in addresses { + address_list.push(format!("{}{}", address, port_str)); + } + info!( + "DNS lookup address by domain success, domain={}, result={:?}", + domain, address_list + ); + } + Err(e) => { + error!( + "DNS lookup address by domain error, domain={}, error={}", + domain, e + ); + } + } + } + None => { + error!("Invalid domain format, missing port: {}", domain); + } + } + + address_list +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn dns_lookup_address_by_domain_returns_correct_addresses() { + let domain = "localhost:8080"; + let addresses = dns_lookup_address_by_domain(domain); + assert!(addresses.contains(&"127.0.0.1:8080".to_string())); + } + + #[test] + fn dns_lookup_address_by_domain_handles_invalid_domain() { + let domain = "invalid_domain"; + let addresses = dns_lookup_address_by_domain(domain); + assert!(addresses.is_empty()); + } + + #[test] + fn dns_lookup_address_by_domain_handles_domain_without_port() { + let domain = "localhost"; + let addresses = dns_lookup_address_by_domain(domain); + assert!(addresses.is_empty()); + } +} diff --git a/rocketmq-common/src/common/broker/broker_config.rs b/rocketmq-common/src/common/broker/broker_config.rs index 584b17c8..c6ff3f62 100644 --- a/rocketmq-common/src/common/broker/broker_config.rs +++ b/rocketmq-common/src/common/broker/broker_config.rs @@ -20,6 +20,7 @@ use serde::Deserialize; use crate::common::constant::PermName; use crate::common::mix_all; +use crate::common::mix_all::NAMESRV_ADDR_PROPERTY; use crate::common::server::config::ServerConfig; use crate::common::topic::TopicValidator; @@ -34,6 +35,8 @@ lazy_static! { None } }; + pub static ref NAMESRV_ADDR: Option = + std::env::var(NAMESRV_ADDR_PROPERTY).map_or(Some("127.0.0.1:9876".to_string()), Some); } #[derive(Debug, Default, Deserialize)] @@ -137,6 +140,8 @@ pub struct BrokerConfig { pub force_register: bool, pub register_name_server_period: u64, pub skip_pre_online: bool, + pub namesrv_addr: Option, + pub fetch_name_srv_addr_by_dns_lookup: bool, } impl Default for BrokerConfig { @@ -188,6 +193,8 @@ impl Default for BrokerConfig { force_register: true, register_name_server_period: 1000 * 30, skip_pre_online: false, + namesrv_addr: NAMESRV_ADDR.clone(), + fetch_name_srv_addr_by_dns_lookup: false, } } } diff --git a/rocketmq-remoting/Cargo.toml b/rocketmq-remoting/Cargo.toml index 0da70b47..2798dd9e 100644 --- a/rocketmq-remoting/Cargo.toml +++ b/rocketmq-remoting/Cargo.toml @@ -14,6 +14,7 @@ readme = "README.md" [dependencies] rocketmq-common = { workspace = true } rocketmq-macros = { workspace = true } +rocketmq-runtime = { workspace = true } anyhow.workspace = true bytes.workspace = true diff --git a/rocketmq-remoting/src/clients.rs b/rocketmq-remoting/src/clients.rs index 15f0c03c..366b714d 100644 --- a/rocketmq-remoting/src/clients.rs +++ b/rocketmq-remoting/src/clients.rs @@ -91,7 +91,7 @@ pub trait RemotingClient: RemotingService { addr: String, request: RemotingCommand, timeout_millis: u64, - ) -> RemotingCommand; + ) -> Result; async fn invoke_async( &mut self, diff --git a/rocketmq-remoting/src/clients/client.rs b/rocketmq-remoting/src/clients/client.rs index b8869729..b0988620 100644 --- a/rocketmq-remoting/src/clients/client.rs +++ b/rocketmq-remoting/src/clients/client.rs @@ -44,7 +44,7 @@ impl Client { /// A new `Client` instance wrapped in a `Result`. Returns an error if the connection fails. pub async fn connect(addr: T) -> anyhow::Result { let tcp_stream = tokio::net::TcpStream::connect(addr).await?; - let socket_addr = tcp_stream.peer_addr().unwrap(); + let socket_addr = tcp_stream.peer_addr()?; Ok(Client { connection: Connection::new(tcp_stream, socket_addr), }) diff --git a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs index da52b670..d26bc5c7 100644 --- a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs +++ b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs @@ -15,13 +15,18 @@ * limitations under the License. */ use std::collections::HashMap; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use rocketmq_common::TokioExecutorService; use tokio::runtime::Handle; +use tokio::task; use tokio::time; +use tracing::debug; +use tracing::error; use tracing::info; +use tracing::warn; use crate::clients::Client; use crate::clients::RemotingClient; @@ -38,10 +43,10 @@ pub struct RocketmqDefaultClient { //cache connection connection_tables: Arc>>>>, - namesrv_addr_list: Arc>>, + namesrv_addr_list: Arc>>, namesrv_addr_choosed: Arc>>, + available_namesrv_addr_set: Arc>>, } - impl RocketmqDefaultClient { pub fn new(tokio_client_config: Arc) -> Self { Self { @@ -49,33 +54,83 @@ impl RocketmqDefaultClient { connection_tables: Arc::new(parking_lot::Mutex::new(Default::default())), namesrv_addr_list: Arc::new(Default::default()), namesrv_addr_choosed: Arc::new(Default::default()), + available_namesrv_addr_set: Arc::new(Default::default()), } } } impl RocketmqDefaultClient { - fn get_and_create_client(&self, addr: String) -> Arc> { - let mut mutex_guard = self.connection_tables.lock(); - if mutex_guard.contains_key(&addr) { - return mutex_guard.get(&addr).unwrap().clone(); + fn get_and_create_client(&self, addr: String) -> Option>> { + let mut connection_tables = self.connection_tables.lock(); + match connection_tables.get(&addr) { + None => { + let addr_inner = addr.clone(); + let handle = Handle::current(); + + match std::thread::spawn(move || { + handle.block_on(async move { Client::connect(addr_inner).await }) + }) + .join() + { + Ok(client_inner) => match client_inner { + Ok(client_r) => { + let client = Arc::new(tokio::sync::Mutex::new(client_r)); + connection_tables.insert(addr, client.clone()); + Some(client) + } + Err(_) => { + error!("getAndCreateClient connect to {} failed", addr); + None + } + }, + Err(_) => { + error!("getAndCreateClient connect to {} failed", addr); + None + } + } + } + Some(conn) => Some(conn.clone()), } + } - let addr_inner = addr.clone(); - let handle = Handle::current(); - let client = std::thread::spawn(move || { - handle.block_on(async move { Client::connect(addr_inner).await.unwrap() }) - }) - .join() - .unwrap(); - // let client = Client::connect(addr_inner).await.unwrap(); - mutex_guard.insert(addr.clone(), Arc::new(tokio::sync::Mutex::new(client))); - mutex_guard.get(&addr).unwrap().clone() + fn scan_available_name_srv(&self) { + if self.namesrv_addr_list.read().is_empty() { + debug!("scanAvailableNameSrv addresses of name server is null!"); + return; + } + for address in self.available_namesrv_addr_set.read().iter() { + if !self.namesrv_addr_list.read().contains(address) { + warn!("scanAvailableNameSrv remove invalid address {}", address); + self.available_namesrv_addr_set.write().remove(address); + } + } + for namesrv_addr in self.namesrv_addr_list.read().iter() { + let client = self.get_and_create_client(namesrv_addr.clone()); + match client { + None => { + self.available_namesrv_addr_set.write().remove(namesrv_addr); + } + Some(_) => { + self.available_namesrv_addr_set + .write() + .insert(namesrv_addr.clone()); + } + } + } } } #[allow(unused_variables)] impl RemotingService for RocketmqDefaultClient { - async fn start(&self) {} + async fn start(&self) { + let client = self.clone(); + let handle = task::spawn(async move { + loop { + client.scan_available_name_srv(); + time::sleep(Duration::from_millis(1)).await; + } + }); + } fn shutdown(&mut self) { todo!() @@ -93,7 +148,7 @@ impl RemotingService for RocketmqDefaultClient { #[allow(unused_variables)] impl RemotingClient for RocketmqDefaultClient { fn update_name_server_address_list(&self, addrs: Vec) { - let mut old = self.namesrv_addr_list.lock(); + let mut old = self.namesrv_addr_list.write(); let mut update = false; if !addrs.is_empty() { @@ -139,13 +194,15 @@ impl RemotingClient for RocketmqDefaultClient { } fn get_name_server_address_list(&self) -> Vec { - /*let cloned = self.namesrv_addr_list.clone(); - Handle::current().block_on(async move { cloned.lock().await.clone() })*/ - self.namesrv_addr_list.lock().clone() + self.namesrv_addr_list.read().clone() } fn get_available_name_srv_list(&self) -> Vec { - vec!["127.0.0.1:9876".to_string()] + self.available_namesrv_addr_set + .read() + .clone() + .into_iter() + .collect() } fn invoke_sync( @@ -153,22 +210,24 @@ impl RemotingClient for RocketmqDefaultClient { addr: String, request: RemotingCommand, timeout_millis: u64, - ) -> RemotingCommand { - let client = self.get_and_create_client(addr.clone()); + ) -> Result { + let client = self.get_and_create_client(addr.clone()).unwrap(); let handle = Handle::current(); - std::thread::spawn(move || { - handle.block_on(async move { client.lock().await.send_read(request).await.unwrap() }) + match std::thread::spawn(move || { + handle.block_on(async move { client.lock().await.send_read(request).await }) }) .join() - .unwrap() - - /*if let Ok(result) = timeout(Duration::from_millis(timeout_millis), async { - client.lock().await.send_read(request).await.unwrap() - }) { - result - } else { - RemotingCommand::create_response_command() - }*/ + { + Ok(cmd) => match cmd { + Ok(cmd_inner) => Ok(cmd_inner), + Err(_) => Err(RemotingError::RemoteException( + "invoke sync error".to_string(), + )), + }, + Err(_) => Err(RemotingError::RemoteException( + "invoke sync error".to_string(), + )), + } } async fn invoke_async( @@ -177,7 +236,7 @@ impl RemotingClient for RocketmqDefaultClient { request: RemotingCommand, timeout_millis: u64, ) -> Result { - let client = self.get_and_create_client(addr.clone()); + let client = self.get_and_create_client(addr.clone()).unwrap(); match time::timeout(Duration::from_millis(timeout_millis), async move { client.lock().await.send_read(request).await }) @@ -197,12 +256,7 @@ impl RemotingClient for RocketmqDefaultClient { request: RemotingCommand, timeout_millis: u64, ) -> Result<(), RemotingError> { - let client = self.get_and_create_client(addr.clone()); - /* let _ = time::timeout(Duration::from_millis(timeout_millis), async move { - client.lock().await.send(request).await.unwrap() - }) - .await;*/ - + let client = self.get_and_create_client(addr.clone()).unwrap(); tokio::spawn(async move { match time::timeout(Duration::from_millis(timeout_millis), async move { client.lock().await.send(request).await.unwrap()