diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 0c7f5fe41..6adca4c11 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -895,6 +895,7 @@ impl BrokerRuntime { self.broker_config.broker_ip1, self.server_config.listen_port ); let broker_id = self.broker_config.broker_identity.broker_id; + let weak = Arc::downgrade(&self.broker_out_api); self.broker_out_api .register_broker_all( cluster_name, @@ -910,6 +911,7 @@ impl BrokerRuntime { false, None, Default::default(), + weak, ) .await; } @@ -1012,6 +1014,7 @@ impl BrokerRuntimeInner { self.broker_config.broker_ip1, self.server_config.listen_port ); let broker_id = self.broker_config.broker_identity.broker_id; + let weak = Arc::downgrade(&self.broker_out_api); self.broker_out_api .register_broker_all( cluster_name, @@ -1027,6 +1030,7 @@ impl BrokerRuntimeInner { false, None, Default::default(), + weak, ) .await; } diff --git a/rocketmq-broker/src/out_api/broker_outer_api.rs b/rocketmq-broker/src/out_api/broker_outer_api.rs index 0e1ef0345..a6505d878 100644 --- a/rocketmq-broker/src/out_api/broker_outer_api.rs +++ b/rocketmq-broker/src/out_api/broker_outer_api.rs @@ -15,12 +15,14 @@ * limitations under the License. */ use std::sync::Arc; +use std::sync::Weak; use dns_lookup::lookup_host; use rocketmq_common::common::broker::broker_config::BrokerIdentity; use rocketmq_common::common::config::TopicConfig; use rocketmq_common::utils::crc32_utils; use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils; +use rocketmq_common::ArcRefCellWrapper; use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient; use rocketmq_remoting::clients::RemotingClient; use rocketmq_remoting::code::request_code::RequestCode; @@ -46,9 +48,8 @@ use tracing::debug; use tracing::error; use tracing::info; -#[derive(Clone)] pub struct BrokerOuterAPI { - remoting_client: RocketmqDefaultClient, + remoting_client: ArcRefCellWrapper>, name_server_address: Option, rpc_client: RpcClientImpl, client_metadata: ClientMetadata, @@ -56,8 +57,10 @@ pub struct BrokerOuterAPI { impl BrokerOuterAPI { pub fn new(tokio_client_config: Arc) -> Self { - let client = - RocketmqDefaultClient::new(tokio_client_config, DefaultRemotingRequestProcessor); + let client = ArcRefCellWrapper::new(RocketmqDefaultClient::new( + tokio_client_config, + DefaultRemotingRequestProcessor, + )); let client_metadata = ClientMetadata::new(); Self { remoting_client: client.clone(), @@ -71,8 +74,10 @@ impl BrokerOuterAPI { tokio_client_config: Arc, rpc_hook: Option>>, ) -> Self { - let mut client = - RocketmqDefaultClient::new(tokio_client_config, DefaultRemotingRequestProcessor); + let mut client = ArcRefCellWrapper::new(RocketmqDefaultClient::new( + tokio_client_config, + DefaultRemotingRequestProcessor, + )); let client_metadata = ClientMetadata::new(); if let Some(rpc_hook) = rpc_hook { client.register_rpc_hook(rpc_hook); @@ -108,7 +113,8 @@ impl BrokerOuterAPI { impl BrokerOuterAPI { pub async fn start(&self) { - self.remoting_client.start().await; + let wrapper = ArcRefCellWrapper::downgrade(&self.remoting_client); + self.remoting_client.start(wrapper).await; } pub async fn update_name_server_address_list(&self, addrs: String) { @@ -143,6 +149,7 @@ impl BrokerOuterAPI { compressed: bool, heartbeat_timeout_millis: Option, _broker_identity: BrokerIdentity, + this: Weak, ) -> Vec { let name_server_address_list = self.remoting_client.get_available_name_srv_list(); let mut register_broker_result_list = Vec::new(); @@ -173,11 +180,21 @@ impl BrokerOuterAPI { let cloned_body = body.clone(); let cloned_header = request_header.clone(); let addr = namesrv_addr.clone(); - let outer_api = self.clone(); + let outer_api = this.clone(); let join_handle = tokio::spawn(async move { - outer_api - .register_broker(addr, oneway, timeout_mills, cloned_header, cloned_body) - .await + if let Some(outer_api) = outer_api.upgrade() { + outer_api + .register_broker( + addr, + oneway, + timeout_mills, + cloned_header, + cloned_body, + ) + .await + } else { + None + } }); /*let handle = self.register_broker(addr, oneway, timeout_mills, cloned_header, cloned_body);*/ diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 1b57d4e41..5c4033779 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -93,7 +93,7 @@ lazy_static! { } pub struct MQClientAPIImpl { - remoting_client: RocketmqDefaultClient, + remoting_client: ArcRefCellWrapper>, top_addressing: Box, // client_remoting_processor: ClientRemotingProcessor, name_srv_addr: Option, @@ -121,7 +121,7 @@ impl MQClientAPIImpl { } MQClientAPIImpl { - remoting_client: default_client, + remoting_client: ArcRefCellWrapper::new(default_client), top_addressing: Box::new(DefaultTopAddressing::new( mix_all::get_ws_addr(), client_config.unit_name.clone(), @@ -133,7 +133,8 @@ impl MQClientAPIImpl { } pub async fn start(&self) { - self.remoting_client.start().await; + let client = ArcRefCellWrapper::downgrade(&self.remoting_client); + self.remoting_client.start(client).await; } pub async fn fetch_name_server_addr(&mut self) -> Option { diff --git a/rocketmq-namesrv/src/bootstrap.rs b/rocketmq-namesrv/src/bootstrap.rs index df96aafa2..0d4bb095d 100644 --- a/rocketmq-namesrv/src/bootstrap.rs +++ b/rocketmq-namesrv/src/bootstrap.rs @@ -20,6 +20,7 @@ use std::time::Duration; use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig; use rocketmq_common::common::server::config::ServerConfig; +use rocketmq_common::ArcRefCellWrapper; use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient; use rocketmq_remoting::remoting_server::server::RocketMQServer; use rocketmq_remoting::request_processor::default_request_processor::DefaultRemotingRequestProcessor; @@ -49,7 +50,7 @@ struct NameServerRuntime { route_info_manager: Arc>, kvconfig_manager: Arc>, name_server_runtime: Option, - remoting_client: RocketmqDefaultClient, + remoting_client: ArcRefCellWrapper, } impl NameServerBootstrap { @@ -142,10 +143,10 @@ impl Builder { let name_server_config = Arc::new(self.name_server_config.unwrap()); let runtime = RocketMQRuntime::new_multi(10, "namesrv-thread"); let tokio_client_config = Arc::new(TokioClientConfig::default()); - let remoting_client = RocketmqDefaultClient::new( + let remoting_client = ArcRefCellWrapper::new(RocketmqDefaultClient::new( tokio_client_config.clone(), DefaultRemotingRequestProcessor, - ); + )); NameServerBootstrap { name_server_runtime: NameServerRuntime { @@ -154,7 +155,7 @@ impl Builder { server_config: Arc::new(self.server_config.unwrap()), route_info_manager: Arc::new(parking_lot::RwLock::new(RouteInfoManager::new( name_server_config.clone(), - Arc::new(remoting_client.clone()), + remoting_client.clone(), ))), kvconfig_manager: Arc::new(parking_lot::RwLock::new(KVConfigManager::new( name_server_config, diff --git a/rocketmq-namesrv/src/route/route_info_manager.rs b/rocketmq-namesrv/src/route/route_info_manager.rs index e01b48792..c2f7cb63b 100644 --- a/rocketmq-namesrv/src/route/route_info_manager.rs +++ b/rocketmq-namesrv/src/route/route_info_manager.rs @@ -27,6 +27,7 @@ use rocketmq_common::common::mix_all; use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig; use rocketmq_common::common::topic::TopicValidator; use rocketmq_common::common::TopicSysFlag; +use rocketmq_common::ArcRefCellWrapper; use rocketmq_common::TimeUtils; use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient; use rocketmq_remoting::clients::RemotingClient; @@ -73,14 +74,14 @@ pub struct RouteInfoManager { pub(crate) filter_server_table: FilterServerTable, pub(crate) topic_queue_mapping_info_table: TopicQueueMappingInfoTable, pub(crate) namesrv_config: Arc, - pub(crate) remoting_client: Arc, + pub(crate) remoting_client: ArcRefCellWrapper, } #[allow(private_interfaces)] impl RouteInfoManager { pub fn new( namesrv_config: Arc, - remoting_client: Arc, + remoting_client: ArcRefCellWrapper, ) -> Self { RouteInfoManager { topic_queue_table: HashMap::new(), diff --git a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs index 5a09123e4..776329da5 100644 --- a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs +++ b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs @@ -22,6 +22,7 @@ use std::time::Duration; use rand::Rng; use rocketmq_common::ArcRefCellWrapper; +use rocketmq_common::WeakCellWrapper; use rocketmq_runtime::RocketMQRuntime; use tokio::sync::Mutex; use tokio::time; @@ -46,7 +47,6 @@ const LOCK_TIMEOUT_MILLIS: u64 = 3000; pub type ArcSyncClient = Arc>; -#[derive(Clone)] pub struct RocketmqDefaultClient { tokio_client_config: Arc, //cache connection @@ -239,22 +239,16 @@ impl RocketmqDefaultClient { #[allow(unused_variables)] impl RemotingService for RocketmqDefaultClient { - async fn start(&self) { - let client = self.clone(); - //invoke scan available name sever now - client.scan_available_name_srv().await; - /*let handle = task::spawn(async move { - loop { - time::sleep(Duration::from_millis(1)).await; - client.scan_available_name_srv().await; - } - });*/ - self.client_runtime.get_handle().spawn(async move { - loop { - time::sleep(Duration::from_millis(1)).await; - client.scan_available_name_srv().await; - } - }); + async fn start(&self, this: WeakCellWrapper) { + if let Some(client) = this.upgrade() { + client.scan_available_name_srv().await; + self.client_runtime.get_handle().spawn(async move { + loop { + time::sleep(Duration::from_millis(1)).await; + client.scan_available_name_srv().await; + } + }); + } } fn shutdown(&mut self) { diff --git a/rocketmq-remoting/src/remoting.rs b/rocketmq-remoting/src/remoting.rs index e9713ddab..04beb36fa 100644 --- a/rocketmq-remoting/src/remoting.rs +++ b/rocketmq-remoting/src/remoting.rs @@ -16,6 +16,8 @@ */ use std::sync::Arc; +use rocketmq_common::WeakCellWrapper; + use crate::base::response_future::ResponseFuture; use crate::protocol::remoting_command::RemotingCommand; use crate::runtime::RPCHook; @@ -36,7 +38,7 @@ pub trait RemotingService: Send { /// This function should initialize and start the service, making it ready to handle incoming /// or outgoing remote procedure calls. The exact implementation details, such as opening /// network connections or preparing internal state, are left to the implementor. - async fn start(&self); + async fn start(&self, this: WeakCellWrapper); /// Shuts down the remoting service. /// diff --git a/rocketmq-remoting/src/rpc/rpc_client_impl.rs b/rocketmq-remoting/src/rpc/rpc_client_impl.rs index 28dd6c907..774ed32cf 100644 --- a/rocketmq-remoting/src/rpc/rpc_client_impl.rs +++ b/rocketmq-remoting/src/rpc/rpc_client_impl.rs @@ -17,6 +17,7 @@ use std::any::Any; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::ArcRefCellWrapper; use crate::clients::rocketmq_default_impl::RocketmqDefaultClient; use crate::clients::RemotingClient; @@ -32,6 +33,7 @@ use crate::protocol::header::pull_message_response_header::PullMessageResponseHe use crate::protocol::header::query_consumer_offset_response_header::QueryConsumerOffsetResponseHeader; use crate::protocol::header::search_offset_response_header::SearchOffsetResponseHeader; use crate::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetResponseHeader; +use crate::request_processor::default_request_processor::DefaultRemotingRequestProcessor; use crate::rpc::client_metadata::ClientMetadata; use crate::rpc::rpc_client::RpcClient; use crate::rpc::rpc_client_hook::RpcClientHookFn; @@ -40,15 +42,17 @@ use crate::rpc::rpc_request::RpcRequest; use crate::rpc::rpc_response::RpcResponse; use crate::Result; -#[derive(Clone)] pub struct RpcClientImpl { client_metadata: ClientMetadata, - remoting_client: RocketmqDefaultClient, + remoting_client: ArcRefCellWrapper>, client_hook_list: Vec, } impl RpcClientImpl { - pub fn new(client_metadata: ClientMetadata, remoting_client: RocketmqDefaultClient) -> Self { + pub fn new( + client_metadata: ClientMetadata, + remoting_client: ArcRefCellWrapper>, + ) -> Self { RpcClientImpl { client_metadata, remoting_client,