Skip to content

Commit e63eaa1

Browse files
authored
[ISSUE #469]🚀Develop BrokerRuntime initializeBrokerScheduledTasks🚀 (#470)
* [ISSUE #469]🚀Develop BrokerRuntime initializeBrokerScheduledTasks🚀 * fix ci error
1 parent 338d84a commit e63eaa1

File tree

10 files changed

+161
-11
lines changed

10 files changed

+161
-11
lines changed

‎rocketmq-broker/src/broker_runtime.rs

+127-7
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@ use std::{
2424
time::Duration,
2525
};
2626

27-
use rocketmq_common::common::{
28-
broker::broker_config::BrokerConfig, config::TopicConfig, config_manager::ConfigManager,
29-
constant::PermName, server::config::ServerConfig,
27+
use rocketmq_common::{
28+
common::{
29+
broker::broker_config::BrokerConfig, config::TopicConfig, config_manager::ConfigManager,
30+
constant::PermName, server::config::ServerConfig,
31+
},
32+
TimeUtils::get_current_millis,
33+
UtilAll::compute_next_morning_time_millis,
3034
};
3135
use rocketmq_remoting::{
3236
protocol::{
@@ -37,9 +41,12 @@ use rocketmq_remoting::{
3741
};
3842
use rocketmq_runtime::RocketMQRuntime;
3943
use rocketmq_store::{
40-
base::store_enum::StoreType, config::message_store_config::MessageStoreConfig,
41-
log_file::MessageStore, message_store::default_message_store::DefaultMessageStore,
42-
stats::broker_stats_manager::BrokerStatsManager, timer::timer_message_store::TimerMessageStore,
44+
base::store_enum::StoreType,
45+
config::message_store_config::MessageStoreConfig,
46+
log_file::MessageStore,
47+
message_store::default_message_store::DefaultMessageStore,
48+
stats::{broker_stats::BrokerStats, broker_stats_manager::BrokerStatsManager},
49+
timer::timer_message_store::TimerMessageStore,
4350
};
4451
use tracing::{info, warn};
4552

@@ -83,6 +90,8 @@ pub(crate) struct BrokerRuntime {
8390
consumer_order_info_manager: Arc<ConsumerOrderInfoManager>,
8491
#[cfg(feature = "local_file_store")]
8592
message_store: Option<DefaultMessageStore>,
93+
#[cfg(feature = "local_file_store")]
94+
broker_stats: Option<Arc<BrokerStats<DefaultMessageStore>>>,
8695
//message_store: Option<Arc<Mutex<LocalFileMessageStore>>>,
8796
schedule_message_service: ScheduleMessageService,
8897
timer_message_store: Option<TimerMessageStore>,
@@ -96,6 +105,7 @@ pub(crate) struct BrokerRuntime {
96105
shutdown_hook: Option<BrokerShutdownHook>,
97106
broker_stats_manager: Arc<BrokerStatsManager>,
98107
topic_queue_mapping_clean_service: Option<Arc<TopicQueueMappingCleanService>>,
108+
update_master_haserver_addr_periodically: bool,
99109
}
100110

101111
impl Clone for BrokerRuntime {
@@ -111,6 +121,7 @@ impl Clone for BrokerRuntime {
111121
consumer_filter_manager: Arc::new(Default::default()),
112122
consumer_order_info_manager: Arc::new(Default::default()),
113123
message_store: self.message_store.clone(),
124+
broker_stats: self.broker_stats.clone(),
114125
schedule_message_service: Default::default(),
115126
timer_message_store: self.timer_message_store.clone(),
116127
broker_out_api: self.broker_out_api.clone(),
@@ -121,6 +132,7 @@ impl Clone for BrokerRuntime {
121132
shutdown_hook: self.shutdown_hook.clone(),
122133
broker_stats_manager: self.broker_stats_manager.clone(),
123134
topic_queue_mapping_clean_service: self.topic_queue_mapping_clean_service.clone(),
135+
update_master_haserver_addr_periodically: self.update_master_haserver_addr_periodically,
124136
}
125137
}
126138
}
@@ -161,6 +173,7 @@ impl BrokerRuntime {
161173
consumer_filter_manager: Arc::new(Default::default()),
162174
consumer_order_info_manager: Arc::new(Default::default()),
163175
message_store: None,
176+
broker_stats: None,
164177
schedule_message_service: Default::default(),
165178
timer_message_store: None,
166179
broker_out_api: broker_outer_api,
@@ -171,6 +184,7 @@ impl BrokerRuntime {
171184
shutdown_hook: None,
172185
broker_stats_manager,
173186
topic_queue_mapping_clean_service: None,
187+
update_master_haserver_addr_periodically: false,
174188
}
175189
}
176190

@@ -256,6 +270,7 @@ impl BrokerRuntime {
256270
);
257271
self.topic_config_manager
258272
.set_message_store(Some(message_store.clone()));
273+
self.broker_stats = Some(Arc::new(BrokerStats::new(Arc::new(message_store.clone()))));
259274
self.message_store = Some(message_store);
260275
} else if self.message_store_config.store_type == StoreType::RocksDB {
261276
info!("Use RocksDB as message store");
@@ -342,7 +357,110 @@ impl BrokerRuntime {
342357
}
343358
}
344359

345-
fn initialize_scheduled_tasks(&mut self) {}
360+
fn initialize_scheduled_tasks(&mut self) {
361+
let initial_delay = compute_next_morning_time_millis() - get_current_millis();
362+
let period = Duration::from_days(1).as_millis() as u64;
363+
let broker_stats = self.broker_stats.clone();
364+
self.broker_runtime
365+
.as_ref()
366+
.unwrap()
367+
.get_handle()
368+
.spawn(async move {
369+
info!("BrokerStats Start scheduled task");
370+
tokio::time::sleep(Duration::from_millis(initial_delay)).await;
371+
loop {
372+
let current_execution_time = tokio::time::Instant::now();
373+
broker_stats.as_ref().unwrap().record();
374+
let next_execution_time =
375+
current_execution_time + Duration::from_millis(period);
376+
let delay =
377+
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
378+
tokio::time::sleep(delay).await;
379+
}
380+
});
381+
382+
let consumer_offset_manager = self.consumer_offset_manager.clone();
383+
let flush_consumer_offset_interval = self.broker_config.flush_consumer_offset_interval;
384+
self.broker_runtime
385+
.as_ref()
386+
.unwrap()
387+
.get_handle()
388+
.spawn(async move {
389+
info!("Consumer offset manager Start scheduled task");
390+
tokio::time::sleep(Duration::from_millis(1000 * 10)).await;
391+
loop {
392+
let current_execution_time = tokio::time::Instant::now();
393+
consumer_offset_manager.persist();
394+
let next_execution_time = current_execution_time
395+
+ Duration::from_millis(flush_consumer_offset_interval);
396+
let delay =
397+
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
398+
tokio::time::sleep(delay).await;
399+
}
400+
});
401+
402+
let consumer_filter_manager = self.consumer_filter_manager.clone();
403+
let consumer_order_info_manager = self.consumer_order_info_manager.clone();
404+
self.broker_runtime
405+
.as_ref()
406+
.unwrap()
407+
.get_handle()
408+
.spawn(async move {
409+
info!("consumer filter manager Start scheduled task");
410+
info!("consumer order info manager Start scheduled task");
411+
tokio::time::sleep(Duration::from_millis(1000 * 10)).await;
412+
loop {
413+
let current_execution_time = tokio::time::Instant::now();
414+
consumer_filter_manager.persist();
415+
consumer_order_info_manager.persist();
416+
let next_execution_time =
417+
current_execution_time + Duration::from_millis(1000 * 10);
418+
let delay =
419+
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
420+
tokio::time::sleep(delay).await;
421+
}
422+
});
423+
424+
let mut runtime = self.clone();
425+
self.broker_runtime
426+
.as_ref()
427+
.unwrap()
428+
.get_handle()
429+
.spawn(async move {
430+
info!("Protect broker Start scheduled task");
431+
tokio::time::sleep(Duration::from_mins(3)).await;
432+
loop {
433+
let current_execution_time = tokio::time::Instant::now();
434+
runtime.protect_broker();
435+
let next_execution_time = current_execution_time + Duration::from_mins(3);
436+
let delay =
437+
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
438+
tokio::time::sleep(delay).await;
439+
}
440+
});
441+
442+
let message_store = self.message_store.clone();
443+
self.broker_runtime
444+
.as_ref()
445+
.unwrap()
446+
.get_handle()
447+
.spawn(async move {
448+
info!("Message store dispatch_behind_bytes Start scheduled task");
449+
tokio::time::sleep(Duration::from_secs(10)).await;
450+
loop {
451+
let current_execution_time = tokio::time::Instant::now();
452+
message_store.as_ref().unwrap().dispatch_behind_bytes();
453+
let next_execution_time = current_execution_time + Duration::from_secs(60);
454+
let delay =
455+
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
456+
tokio::time::sleep(delay).await;
457+
}
458+
});
459+
460+
if self.broker_config.enable_controller_mode {
461+
self.update_master_haserver_addr_periodically = true;
462+
}
463+
}
346464

347465
fn initial_transaction(&mut self) {}
348466

@@ -351,6 +469,8 @@ impl BrokerRuntime {
351469
fn initial_rpc_hooks(&mut self) {}
352470
fn initial_request_pipeline(&mut self) {}
353471

472+
fn protect_broker(&mut self) {}
473+
354474
pub async fn start(&mut self) {
355475
self.message_store
356476
.as_mut()

‎rocketmq-broker/src/filter/manager/consumer_filter_manager.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl ConfigManager for ConsumerFilterManager {
4646
}
4747

4848
fn encode_pretty(&self, pretty_format: bool) -> String {
49-
todo!()
49+
"".to_string()
5050
}
5151

5252
fn decode(&self, json_string: &str) {}

‎rocketmq-broker/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
1817
#![allow(dead_code)]
18+
#![feature(duration_constructors)]
1919

2020
pub use broker_bootstrap::{BrokerBootstrap, Builder};
2121

‎rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl ConfigManager for ConsumerOffsetManager {
4949
}
5050

5151
fn encode_pretty(&self, pretty_format: bool) -> String {
52-
todo!()
52+
"".to_string()
5353
}
5454

5555
fn decode(&self, json_string: &str) {

‎rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl ConfigManager for ConsumerOrderInfoManager {
5252
}
5353

5454
fn encode_pretty(&self, pretty_format: bool) -> String {
55-
todo!()
55+
"".to_string()
5656
}
5757

5858
fn decode(&self, json_string: &str) {

‎rocketmq-broker/src/util/hook_utils.rs

+4
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,10 @@ mod tests {
479479
fn get_broker_stats_manager(&self) -> Option<Arc<BrokerStatsManager>> {
480480
todo!()
481481
}
482+
483+
fn dispatch_behind_bytes(&self) {
484+
todo!()
485+
}
482486
// Implement required methods...
483487
}
484488

‎rocketmq-common/src/common/broker/broker_config.rs

+2
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ pub struct BrokerConfig {
118118
pub enable_slave_acting_master: bool,
119119
pub reject_transaction_message: bool,
120120
pub enable_detail_stat: bool,
121+
pub flush_consumer_offset_interval: u64,
121122
}
122123

123124
impl Default for BrokerConfig {
@@ -165,6 +166,7 @@ impl Default for BrokerConfig {
165166
enable_slave_acting_master: false,
166167
reject_transaction_message: false,
167168
enable_detail_stat: true,
169+
flush_consumer_offset_interval: 1000 * 5,
168170
}
169171
}
170172
}

‎rocketmq-common/src/utils/util_all.rs

+20
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,15 @@ pub fn compute_next_minutes_time_millis() -> u64 {
232232
((millis_since_epoch / millis_in_minute) + 1) * millis_in_minute
233233
}
234234

235+
pub fn compute_next_morning_time_millis() -> u64 {
236+
let now = Local::now();
237+
let tomorrow = now.date_naive().succ_opt().unwrap();
238+
let next_morning = Local
239+
.with_ymd_and_hms(tomorrow.year(), tomorrow.month(), tomorrow.day(), 0, 0, 0)
240+
.unwrap();
241+
next_morning.timestamp_millis() as u64
242+
}
243+
235244
#[cfg(test)]
236245
mod tests {
237246
use std::time::Instant;
@@ -304,4 +313,15 @@ mod tests {
304313
assert!(next_minute > now);
305314
assert_eq!(next_minute % (60 * 1000), 0);
306315
}
316+
317+
#[test]
318+
fn compute_next_morning_time_millis_returns_correct_time() {
319+
let now = Local::now();
320+
let next_morning = compute_next_morning_time_millis();
321+
let expected_next_morning = Local
322+
.ymd(now.year(), now.month(), now.day() + 1)
323+
.and_hms(0, 0, 0)
324+
.timestamp_millis();
325+
assert_eq!(next_morning, expected_next_morning as u64);
326+
}
307327
}

‎rocketmq-store/src/log_file.rs

+2
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,6 @@ pub trait RocketMQMessageStore: Clone + 'static {
7575
fn set_put_message_hook(&self, put_message_hook: BoxedPutMessageHook);
7676

7777
fn get_broker_stats_manager(&self) -> Option<Arc<BrokerStatsManager>>;
78+
79+
fn dispatch_behind_bytes(&self);
7880
}

‎rocketmq-store/src/message_store/default_message_store.rs

+2
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,8 @@ impl MessageStore for DefaultMessageStore {
579579
fn get_broker_stats_manager(&self) -> Option<Arc<BrokerStatsManager>> {
580580
self.broker_stats_manager.clone()
581581
}
582+
583+
fn dispatch_behind_bytes(&self) {}
582584
}
583585

584586
#[derive(Clone)]

0 commit comments

Comments
 (0)