Skip to content

[ISSUE #469]🚀Develop BrokerRuntime initializeBrokerScheduledTasks🚀 #470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 127 additions & 7 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
time::Duration,
};

use rocketmq_common::common::{
broker::broker_config::BrokerConfig, config::TopicConfig, config_manager::ConfigManager,
constant::PermName, server::config::ServerConfig,
use rocketmq_common::{
common::{
broker::broker_config::BrokerConfig, config::TopicConfig, config_manager::ConfigManager,
constant::PermName, server::config::ServerConfig,
},
TimeUtils::get_current_millis,
UtilAll::compute_next_morning_time_millis,
};
use rocketmq_remoting::{
protocol::{
Expand All @@ -37,9 +41,12 @@
};
use rocketmq_runtime::RocketMQRuntime;
use rocketmq_store::{
base::store_enum::StoreType, config::message_store_config::MessageStoreConfig,
log_file::MessageStore, message_store::default_message_store::DefaultMessageStore,
stats::broker_stats_manager::BrokerStatsManager, timer::timer_message_store::TimerMessageStore,
base::store_enum::StoreType,
config::message_store_config::MessageStoreConfig,
log_file::MessageStore,
message_store::default_message_store::DefaultMessageStore,
stats::{broker_stats::BrokerStats, broker_stats_manager::BrokerStatsManager},
timer::timer_message_store::TimerMessageStore,
};
use tracing::{info, warn};

Expand Down Expand Up @@ -83,6 +90,8 @@
consumer_order_info_manager: Arc<ConsumerOrderInfoManager>,
#[cfg(feature = "local_file_store")]
message_store: Option<DefaultMessageStore>,
#[cfg(feature = "local_file_store")]
broker_stats: Option<Arc<BrokerStats<DefaultMessageStore>>>,
//message_store: Option<Arc<Mutex<LocalFileMessageStore>>>,
schedule_message_service: ScheduleMessageService,
timer_message_store: Option<TimerMessageStore>,
Expand All @@ -96,6 +105,7 @@
shutdown_hook: Option<BrokerShutdownHook>,
broker_stats_manager: Arc<BrokerStatsManager>,
topic_queue_mapping_clean_service: Option<Arc<TopicQueueMappingCleanService>>,
update_master_haserver_addr_periodically: bool,
}

impl Clone for BrokerRuntime {
Expand All @@ -111,6 +121,7 @@
consumer_filter_manager: Arc::new(Default::default()),
consumer_order_info_manager: Arc::new(Default::default()),
message_store: self.message_store.clone(),
broker_stats: self.broker_stats.clone(),

Check warning on line 124 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L124

Added line #L124 was not covered by tests
schedule_message_service: Default::default(),
timer_message_store: self.timer_message_store.clone(),
broker_out_api: self.broker_out_api.clone(),
Expand All @@ -121,6 +132,7 @@
shutdown_hook: self.shutdown_hook.clone(),
broker_stats_manager: self.broker_stats_manager.clone(),
topic_queue_mapping_clean_service: self.topic_queue_mapping_clean_service.clone(),
update_master_haserver_addr_periodically: self.update_master_haserver_addr_periodically,

Check warning on line 135 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L135

Added line #L135 was not covered by tests
}
}
}
Expand Down Expand Up @@ -161,6 +173,7 @@
consumer_filter_manager: Arc::new(Default::default()),
consumer_order_info_manager: Arc::new(Default::default()),
message_store: None,
broker_stats: None,

Check warning on line 176 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L176

Added line #L176 was not covered by tests
schedule_message_service: Default::default(),
timer_message_store: None,
broker_out_api: broker_outer_api,
Expand All @@ -171,6 +184,7 @@
shutdown_hook: None,
broker_stats_manager,
topic_queue_mapping_clean_service: None,
update_master_haserver_addr_periodically: false,
}
}

Expand Down Expand Up @@ -256,6 +270,7 @@
);
self.topic_config_manager
.set_message_store(Some(message_store.clone()));
self.broker_stats = Some(Arc::new(BrokerStats::new(Arc::new(message_store.clone()))));

Check warning on line 273 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L273

Added line #L273 was not covered by tests
self.message_store = Some(message_store);
} else if self.message_store_config.store_type == StoreType::RocksDB {
info!("Use RocksDB as message store");
Expand Down Expand Up @@ -342,7 +357,110 @@
}
}

fn initialize_scheduled_tasks(&mut self) {}
fn initialize_scheduled_tasks(&mut self) {
let initial_delay = compute_next_morning_time_millis() - get_current_millis();
let period = Duration::from_days(1).as_millis() as u64;
let broker_stats = self.broker_stats.clone();
self.broker_runtime

Check warning on line 364 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L360-L364

Added lines #L360 - L364 were not covered by tests
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
info!("BrokerStats Start scheduled task");
tokio::time::sleep(Duration::from_millis(initial_delay)).await;

Check warning on line 370 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L368-L370

Added lines #L368 - L370 were not covered by tests
loop {
let current_execution_time = tokio::time::Instant::now();
broker_stats.as_ref().unwrap().record();

Check warning on line 373 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L372-L373

Added lines #L372 - L373 were not covered by tests
let next_execution_time =
current_execution_time + Duration::from_millis(period);

Check warning on line 375 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L375

Added line #L375 was not covered by tests
let delay =
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
tokio::time::sleep(delay).await;

Check warning on line 378 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L377-L378

Added lines #L377 - L378 were not covered by tests
}
});

Check warning on line 380 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L380

Added line #L380 was not covered by tests

let consumer_offset_manager = self.consumer_offset_manager.clone();
let flush_consumer_offset_interval = self.broker_config.flush_consumer_offset_interval;
self.broker_runtime

Check warning on line 384 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L382-L384

Added lines #L382 - L384 were not covered by tests
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
info!("Consumer offset manager Start scheduled task");
tokio::time::sleep(Duration::from_millis(1000 * 10)).await;

Check warning on line 390 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L388-L390

Added lines #L388 - L390 were not covered by tests
loop {
let current_execution_time = tokio::time::Instant::now();
consumer_offset_manager.persist();
let next_execution_time = current_execution_time
+ Duration::from_millis(flush_consumer_offset_interval);

Check warning on line 395 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L392-L395

Added lines #L392 - L395 were not covered by tests
let delay =
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
tokio::time::sleep(delay).await;

Check warning on line 398 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L397-L398

Added lines #L397 - L398 were not covered by tests
}
});

Check warning on line 400 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L400

Added line #L400 was not covered by tests

let consumer_filter_manager = self.consumer_filter_manager.clone();
let consumer_order_info_manager = self.consumer_order_info_manager.clone();
self.broker_runtime

Check warning on line 404 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L402-L404

Added lines #L402 - L404 were not covered by tests
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
info!("consumer filter manager Start scheduled task");
info!("consumer order info manager Start scheduled task");
tokio::time::sleep(Duration::from_millis(1000 * 10)).await;

Check warning on line 411 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L408-L411

Added lines #L408 - L411 were not covered by tests
loop {
let current_execution_time = tokio::time::Instant::now();
consumer_filter_manager.persist();
consumer_order_info_manager.persist();

Check warning on line 415 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L413-L415

Added lines #L413 - L415 were not covered by tests
let next_execution_time =
current_execution_time + Duration::from_millis(1000 * 10);

Check warning on line 417 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L417

Added line #L417 was not covered by tests
let delay =
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
tokio::time::sleep(delay).await;

Check warning on line 420 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L419-L420

Added lines #L419 - L420 were not covered by tests
}
});

Check warning on line 422 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L422

Added line #L422 was not covered by tests

let mut runtime = self.clone();
self.broker_runtime

Check warning on line 425 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L424-L425

Added lines #L424 - L425 were not covered by tests
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
info!("Protect broker Start scheduled task");
tokio::time::sleep(Duration::from_mins(3)).await;

Check warning on line 431 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L429-L431

Added lines #L429 - L431 were not covered by tests
loop {
let current_execution_time = tokio::time::Instant::now();
runtime.protect_broker();
let next_execution_time = current_execution_time + Duration::from_mins(3);

Check warning on line 435 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L433-L435

Added lines #L433 - L435 were not covered by tests
let delay =
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
tokio::time::sleep(delay).await;

Check warning on line 438 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L437-L438

Added lines #L437 - L438 were not covered by tests
}
});

Check warning on line 440 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L440

Added line #L440 was not covered by tests

let message_store = self.message_store.clone();
self.broker_runtime

Check warning on line 443 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L442-L443

Added lines #L442 - L443 were not covered by tests
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
info!("Message store dispatch_behind_bytes Start scheduled task");
tokio::time::sleep(Duration::from_secs(10)).await;

Check warning on line 449 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L447-L449

Added lines #L447 - L449 were not covered by tests
loop {
let current_execution_time = tokio::time::Instant::now();
message_store.as_ref().unwrap().dispatch_behind_bytes();
let next_execution_time = current_execution_time + Duration::from_secs(60);

Check warning on line 453 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L451-L453

Added lines #L451 - L453 were not covered by tests
let delay =
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
tokio::time::sleep(delay).await;

Check warning on line 456 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L455-L456

Added lines #L455 - L456 were not covered by tests
}
});

Check warning on line 458 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L458

Added line #L458 was not covered by tests

if self.broker_config.enable_controller_mode {
self.update_master_haserver_addr_periodically = true;

Check warning on line 461 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L460-L461

Added lines #L460 - L461 were not covered by tests
}
}

Check warning on line 463 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L463

Added line #L463 was not covered by tests

fn initial_transaction(&mut self) {}

Expand All @@ -351,6 +469,8 @@
fn initial_rpc_hooks(&mut self) {}
fn initial_request_pipeline(&mut self) {}

fn protect_broker(&mut self) {}

Check warning on line 472 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L472

Added line #L472 was not covered by tests

pub async fn start(&mut self) {
self.message_store
.as_mut()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
}

fn encode_pretty(&self, pretty_format: bool) -> String {
todo!()
"".to_string()

Check warning on line 49 in rocketmq-broker/src/filter/manager/consumer_filter_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/filter/manager/consumer_filter_manager.rs#L49

Added line #L49 was not covered by tests
}

fn decode(&self, json_string: &str) {}
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#![allow(dead_code)]
#![feature(duration_constructors)]

pub use broker_bootstrap::{BrokerBootstrap, Builder};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
}

fn encode_pretty(&self, pretty_format: bool) -> String {
todo!()
"".to_string()

Check warning on line 52 in rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs#L52

Added line #L52 was not covered by tests
}

fn decode(&self, json_string: &str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
}

fn encode_pretty(&self, pretty_format: bool) -> String {
todo!()
"".to_string()

Check warning on line 55 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L55

Added line #L55 was not covered by tests
}

fn decode(&self, json_string: &str) {
Expand Down
4 changes: 4 additions & 0 deletions rocketmq-broker/src/util/hook_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,10 @@
fn get_broker_stats_manager(&self) -> Option<Arc<BrokerStatsManager>> {
todo!()
}

fn dispatch_behind_bytes(&self) {
todo!()

Check warning on line 484 in rocketmq-broker/src/util/hook_utils.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/util/hook_utils.rs#L483-L484

Added lines #L483 - L484 were not covered by tests
}
// Implement required methods...
}

Expand Down
2 changes: 2 additions & 0 deletions rocketmq-common/src/common/broker/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
pub enable_slave_acting_master: bool,
pub reject_transaction_message: bool,
pub enable_detail_stat: bool,
pub flush_consumer_offset_interval: u64,

Check warning on line 121 in rocketmq-common/src/common/broker/broker_config.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/broker/broker_config.rs#L121

Added line #L121 was not covered by tests
}

impl Default for BrokerConfig {
Expand Down Expand Up @@ -165,6 +166,7 @@
enable_slave_acting_master: false,
reject_transaction_message: false,
enable_detail_stat: true,
flush_consumer_offset_interval: 1000 * 5,
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions rocketmq-common/src/utils/util_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,15 @@ pub fn compute_next_minutes_time_millis() -> u64 {
((millis_since_epoch / millis_in_minute) + 1) * millis_in_minute
}

pub fn compute_next_morning_time_millis() -> u64 {
let now = Local::now();
let tomorrow = now.date_naive().succ_opt().unwrap();
let next_morning = Local
.with_ymd_and_hms(tomorrow.year(), tomorrow.month(), tomorrow.day(), 0, 0, 0)
.unwrap();
next_morning.timestamp_millis() as u64
}

#[cfg(test)]
mod tests {
use std::time::Instant;
Expand Down Expand Up @@ -304,4 +313,15 @@ mod tests {
assert!(next_minute > now);
assert_eq!(next_minute % (60 * 1000), 0);
}

#[test]
fn compute_next_morning_time_millis_returns_correct_time() {
let now = Local::now();
let next_morning = compute_next_morning_time_millis();
let expected_next_morning = Local
.ymd(now.year(), now.month(), now.day() + 1)
.and_hms(0, 0, 0)
.timestamp_millis();
assert_eq!(next_morning, expected_next_morning as u64);
}
}
2 changes: 2 additions & 0 deletions rocketmq-store/src/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ pub trait RocketMQMessageStore: Clone + 'static {
fn set_put_message_hook(&self, put_message_hook: BoxedPutMessageHook);

fn get_broker_stats_manager(&self) -> Option<Arc<BrokerStatsManager>>;

fn dispatch_behind_bytes(&self);
}
2 changes: 2 additions & 0 deletions rocketmq-store/src/message_store/default_message_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,8 @@
fn get_broker_stats_manager(&self) -> Option<Arc<BrokerStatsManager>> {
self.broker_stats_manager.clone()
}

fn dispatch_behind_bytes(&self) {}

Check warning on line 583 in rocketmq-store/src/message_store/default_message_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/message_store/default_message_store.rs#L583

Added line #L583 was not covered by tests
}

#[derive(Clone)]
Expand Down
Loading