From f982c9cfc9a77103fb187b9b20e0267f7b5ef388 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 9 Jun 2024 19:50:10 +0800 Subject: [PATCH 1/2] =?UTF-8?q?[ISSUE=20#469]=F0=9F=9A=80Develop=20BrokerR?= =?UTF-8?q?untime=20initializeBrokerScheduledTasks=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-broker/src/broker_runtime.rs | 134 +++++++++++++++++- .../filter/manager/consumer_filter_manager.rs | 2 +- rocketmq-broker/src/lib.rs | 2 +- .../offset/manager/consumer_offset_manager.rs | 2 +- .../manager/consumer_order_info_manager.rs | 2 +- rocketmq-broker/src/util/hook_utils.rs | 4 + .../src/common/broker/broker_config.rs | 2 + rocketmq-common/src/utils/util_all.rs | 20 +++ rocketmq-store/src/log_file.rs | 2 + .../message_store/default_message_store.rs | 2 + 10 files changed, 161 insertions(+), 11 deletions(-) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 6f856dc9..ced9dad6 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -24,9 +24,13 @@ use std::{ time::Duration, }; -use rocketmq_common::common::{ - broker::broker_config::BrokerConfig, config::TopicConfig, config_manager::ConfigManager, - constant::PermName, server::config::ServerConfig, +use rocketmq_common::{ + common::{ + broker::broker_config::BrokerConfig, config::TopicConfig, config_manager::ConfigManager, + constant::PermName, server::config::ServerConfig, + }, + TimeUtils::get_current_millis, + UtilAll::compute_next_morning_time_millis, }; use rocketmq_remoting::{ protocol::{ @@ -37,9 +41,12 @@ use rocketmq_remoting::{ }; use rocketmq_runtime::RocketMQRuntime; use rocketmq_store::{ - base::store_enum::StoreType, config::message_store_config::MessageStoreConfig, - log_file::MessageStore, message_store::default_message_store::DefaultMessageStore, - stats::broker_stats_manager::BrokerStatsManager, timer::timer_message_store::TimerMessageStore, + base::store_enum::StoreType, + config::message_store_config::MessageStoreConfig, + log_file::MessageStore, + message_store::default_message_store::DefaultMessageStore, + stats::{broker_stats::BrokerStats, broker_stats_manager::BrokerStatsManager}, + timer::timer_message_store::TimerMessageStore, }; use tracing::{info, warn}; @@ -83,6 +90,8 @@ pub(crate) struct BrokerRuntime { consumer_order_info_manager: Arc, #[cfg(feature = "local_file_store")] message_store: Option, + #[cfg(feature = "local_file_store")] + broker_stats: Option>>, //message_store: Option>>, schedule_message_service: ScheduleMessageService, timer_message_store: Option, @@ -96,6 +105,7 @@ pub(crate) struct BrokerRuntime { shutdown_hook: Option, broker_stats_manager: Arc, topic_queue_mapping_clean_service: Option>, + update_master_haserver_addr_periodically: bool, } impl Clone for BrokerRuntime { @@ -111,6 +121,7 @@ impl Clone for BrokerRuntime { consumer_filter_manager: Arc::new(Default::default()), consumer_order_info_manager: Arc::new(Default::default()), message_store: self.message_store.clone(), + broker_stats: self.broker_stats.clone(), schedule_message_service: Default::default(), timer_message_store: self.timer_message_store.clone(), broker_out_api: self.broker_out_api.clone(), @@ -121,6 +132,7 @@ impl Clone for BrokerRuntime { shutdown_hook: self.shutdown_hook.clone(), broker_stats_manager: self.broker_stats_manager.clone(), topic_queue_mapping_clean_service: self.topic_queue_mapping_clean_service.clone(), + update_master_haserver_addr_periodically: self.update_master_haserver_addr_periodically, } } } @@ -161,6 +173,7 @@ impl BrokerRuntime { consumer_filter_manager: Arc::new(Default::default()), consumer_order_info_manager: Arc::new(Default::default()), message_store: None, + broker_stats: None, schedule_message_service: Default::default(), timer_message_store: None, broker_out_api: broker_outer_api, @@ -171,6 +184,7 @@ impl BrokerRuntime { shutdown_hook: None, broker_stats_manager, topic_queue_mapping_clean_service: None, + update_master_haserver_addr_periodically: false, } } @@ -256,6 +270,7 @@ impl BrokerRuntime { ); self.topic_config_manager .set_message_store(Some(message_store.clone())); + self.broker_stats = Some(Arc::new(BrokerStats::new(Arc::new(message_store.clone())))); self.message_store = Some(message_store); } else if self.message_store_config.store_type == StoreType::RocksDB { info!("Use RocksDB as message store"); @@ -342,7 +357,110 @@ impl BrokerRuntime { } } - fn initialize_scheduled_tasks(&mut self) {} + fn initialize_scheduled_tasks(&mut self) { + let initial_delay = compute_next_morning_time_millis() - get_current_millis(); + let period = Duration::from_days(1).as_millis() as u64; + let broker_stats = self.broker_stats.clone(); + self.broker_runtime + .as_ref() + .unwrap() + .get_handle() + .spawn(async move { + info!("BrokerStats Start scheduled task"); + tokio::time::sleep(Duration::from_millis(initial_delay)).await; + loop { + let current_execution_time = tokio::time::Instant::now(); + broker_stats.as_ref().unwrap().record(); + let next_execution_time = + current_execution_time + Duration::from_millis(period); + let delay = + next_execution_time.saturating_duration_since(tokio::time::Instant::now()); + tokio::time::sleep(delay).await; + } + }); + + let consumer_offset_manager = self.consumer_offset_manager.clone(); + let flush_consumer_offset_interval = self.broker_config.flush_consumer_offset_interval; + self.broker_runtime + .as_ref() + .unwrap() + .get_handle() + .spawn(async move { + info!("Consumer offset manager Start scheduled task"); + tokio::time::sleep(Duration::from_millis(1000 * 10)).await; + loop { + let current_execution_time = tokio::time::Instant::now(); + consumer_offset_manager.persist(); + let next_execution_time = current_execution_time + + Duration::from_millis(flush_consumer_offset_interval); + let delay = + next_execution_time.saturating_duration_since(tokio::time::Instant::now()); + tokio::time::sleep(delay).await; + } + }); + + let consumer_filter_manager = self.consumer_filter_manager.clone(); + let consumer_order_info_manager = self.consumer_order_info_manager.clone(); + self.broker_runtime + .as_ref() + .unwrap() + .get_handle() + .spawn(async move { + info!("consumer filter manager Start scheduled task"); + info!("consumer order info manager Start scheduled task"); + tokio::time::sleep(Duration::from_millis(1000 * 10)).await; + loop { + let current_execution_time = tokio::time::Instant::now(); + consumer_filter_manager.persist(); + consumer_order_info_manager.persist(); + let next_execution_time = + current_execution_time + Duration::from_millis(1000 * 10); + let delay = + next_execution_time.saturating_duration_since(tokio::time::Instant::now()); + tokio::time::sleep(delay).await; + } + }); + + let mut runtime = self.clone(); + self.broker_runtime + .as_ref() + .unwrap() + .get_handle() + .spawn(async move { + info!("Protect broker Start scheduled task"); + tokio::time::sleep(Duration::from_mins(3)).await; + loop { + let current_execution_time = tokio::time::Instant::now(); + runtime.protect_broker(); + let next_execution_time = current_execution_time + Duration::from_mins(3); + let delay = + next_execution_time.saturating_duration_since(tokio::time::Instant::now()); + tokio::time::sleep(delay).await; + } + }); + + let message_store = self.message_store.clone(); + self.broker_runtime + .as_ref() + .unwrap() + .get_handle() + .spawn(async move { + info!("Message store dispatch_behind_bytes Start scheduled task"); + tokio::time::sleep(Duration::from_secs(10)).await; + loop { + let current_execution_time = tokio::time::Instant::now(); + message_store.as_ref().unwrap().dispatch_behind_bytes(); + let next_execution_time = current_execution_time + Duration::from_secs(60); + let delay = + next_execution_time.saturating_duration_since(tokio::time::Instant::now()); + tokio::time::sleep(delay).await; + } + }); + + if self.broker_config.enable_controller_mode { + self.update_master_haserver_addr_periodically = true; + } + } fn initial_transaction(&mut self) {} @@ -351,6 +469,8 @@ impl BrokerRuntime { fn initial_rpc_hooks(&mut self) {} fn initial_request_pipeline(&mut self) {} + fn protect_broker(&mut self) {} + pub async fn start(&mut self) { self.message_store .as_mut() diff --git a/rocketmq-broker/src/filter/manager/consumer_filter_manager.rs b/rocketmq-broker/src/filter/manager/consumer_filter_manager.rs index be75ef36..17bbae27 100644 --- a/rocketmq-broker/src/filter/manager/consumer_filter_manager.rs +++ b/rocketmq-broker/src/filter/manager/consumer_filter_manager.rs @@ -46,7 +46,7 @@ impl ConfigManager for ConsumerFilterManager { } fn encode_pretty(&self, pretty_format: bool) -> String { - todo!() + "".to_string() } fn decode(&self, json_string: &str) {} diff --git a/rocketmq-broker/src/lib.rs b/rocketmq-broker/src/lib.rs index 26c55492..acc8db8c 100644 --- a/rocketmq-broker/src/lib.rs +++ b/rocketmq-broker/src/lib.rs @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #![allow(dead_code)] +#![feature(duration_constructors)] pub use broker_bootstrap::{BrokerBootstrap, Builder}; diff --git a/rocketmq-broker/src/offset/manager/consumer_offset_manager.rs b/rocketmq-broker/src/offset/manager/consumer_offset_manager.rs index 25434762..bf36ee5a 100644 --- a/rocketmq-broker/src/offset/manager/consumer_offset_manager.rs +++ b/rocketmq-broker/src/offset/manager/consumer_offset_manager.rs @@ -49,7 +49,7 @@ impl ConfigManager for ConsumerOffsetManager { } fn encode_pretty(&self, pretty_format: bool) -> String { - todo!() + "".to_string() } fn decode(&self, json_string: &str) { diff --git a/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs b/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs index 3ee26911..5140dbbf 100644 --- a/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs +++ b/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs @@ -52,7 +52,7 @@ impl ConfigManager for ConsumerOrderInfoManager { } fn encode_pretty(&self, pretty_format: bool) -> String { - todo!() + "".to_string() } fn decode(&self, json_string: &str) { diff --git a/rocketmq-broker/src/util/hook_utils.rs b/rocketmq-broker/src/util/hook_utils.rs index 71266fe5..b4206e71 100644 --- a/rocketmq-broker/src/util/hook_utils.rs +++ b/rocketmq-broker/src/util/hook_utils.rs @@ -479,6 +479,10 @@ mod tests { fn get_broker_stats_manager(&self) -> Option> { todo!() } + + fn dispatch_behind_bytes(&self) { + todo!() + } // Implement required methods... } diff --git a/rocketmq-common/src/common/broker/broker_config.rs b/rocketmq-common/src/common/broker/broker_config.rs index 1a1535d7..e0241b8d 100644 --- a/rocketmq-common/src/common/broker/broker_config.rs +++ b/rocketmq-common/src/common/broker/broker_config.rs @@ -118,6 +118,7 @@ pub struct BrokerConfig { pub enable_slave_acting_master: bool, pub reject_transaction_message: bool, pub enable_detail_stat: bool, + pub flush_consumer_offset_interval: u64, } impl Default for BrokerConfig { @@ -165,6 +166,7 @@ impl Default for BrokerConfig { enable_slave_acting_master: false, reject_transaction_message: false, enable_detail_stat: true, + flush_consumer_offset_interval: 1000 * 5, } } } diff --git a/rocketmq-common/src/utils/util_all.rs b/rocketmq-common/src/utils/util_all.rs index e52f3c71..ccc04bf6 100644 --- a/rocketmq-common/src/utils/util_all.rs +++ b/rocketmq-common/src/utils/util_all.rs @@ -232,6 +232,15 @@ pub fn compute_next_minutes_time_millis() -> u64 { ((millis_since_epoch / millis_in_minute) + 1) * millis_in_minute } +pub fn compute_next_morning_time_millis() -> u64 { + let now = Local::now(); + let tomorrow = now.date_naive().succ_opt().unwrap(); + let next_morning = Local + .with_ymd_and_hms(tomorrow.year(), tomorrow.month(), tomorrow.day(), 0, 0, 0) + .unwrap(); + next_morning.timestamp_millis() as u64 +} + #[cfg(test)] mod tests { use std::time::Instant; @@ -304,4 +313,15 @@ mod tests { assert!(next_minute > now); assert_eq!(next_minute % (60 * 1000), 0); } + + #[test] + fn compute_next_morning_time_millis_returns_correct_time() { + let now = Local::now(); + let next_morning = compute_next_morning_time_millis(); + let expected_next_morning = Local + .ymd(now.year(), now.month(), now.day() + 1) + .and_hms(0, 0, 0) + .timestamp_millis(); + assert_eq!(next_morning, expected_next_morning); + } } diff --git a/rocketmq-store/src/log_file.rs b/rocketmq-store/src/log_file.rs index 81b2645f..ad4ccc99 100644 --- a/rocketmq-store/src/log_file.rs +++ b/rocketmq-store/src/log_file.rs @@ -75,4 +75,6 @@ pub trait RocketMQMessageStore: Clone + 'static { fn set_put_message_hook(&self, put_message_hook: BoxedPutMessageHook); fn get_broker_stats_manager(&self) -> Option>; + + fn dispatch_behind_bytes(&self); } diff --git a/rocketmq-store/src/message_store/default_message_store.rs b/rocketmq-store/src/message_store/default_message_store.rs index 769e4aa9..f2624687 100644 --- a/rocketmq-store/src/message_store/default_message_store.rs +++ b/rocketmq-store/src/message_store/default_message_store.rs @@ -579,6 +579,8 @@ impl MessageStore for DefaultMessageStore { fn get_broker_stats_manager(&self) -> Option> { self.broker_stats_manager.clone() } + + fn dispatch_behind_bytes(&self) {} } #[derive(Clone)] From 9befd21ec9e434d6209daf50acf61e28e50b07d0 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 9 Jun 2024 19:53:55 +0800 Subject: [PATCH 2/2] fix ci error --- rocketmq-common/src/utils/util_all.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-common/src/utils/util_all.rs b/rocketmq-common/src/utils/util_all.rs index ccc04bf6..ed3e5c28 100644 --- a/rocketmq-common/src/utils/util_all.rs +++ b/rocketmq-common/src/utils/util_all.rs @@ -322,6 +322,6 @@ mod tests { .ymd(now.year(), now.month(), now.day() + 1) .and_hms(0, 0, 0) .timestamp_millis(); - assert_eq!(next_morning, expected_next_morning); + assert_eq!(next_morning, expected_next_morning as u64); } }