Skip to content

Commit 42682c9

Browse files
committed
[ISSUE #441]🔥Improving PutMessageHook logic🚀
1 parent 608b44b commit 42682c9

13 files changed

+212
-19
lines changed

Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,6 @@ parking_lot = "0.12"
6666
dirs = "5.0"
6767
trait-variant = "0.1.2"
6868

69-
once_cell = "1.19.0"
69+
once_cell = "1.19.0"
70+
71+
mockall = "0.12.1"

rocketmq-broker/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ dirs.workspace = true
5656

5757
local-ip-address = "0.6.1"
5858

59+
[dev-dependencies]
60+
mockall = "0.12.1"
61+
5962
[[bin]]
6063
name = "rocketmq-broker-rust"
6164
path = "src/bin/broker_bootstrap_server.rs"

rocketmq-broker/src/broker_runtime.rs

+17
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ use crate::{
4747
broker::broker_hook::BrokerShutdownHook,
4848
client::manager::producer_manager::ProducerManager,
4949
filter::manager::consumer_filter_manager::ConsumerFilterManager,
50+
hook::{
51+
batch_check_before_put_message::BatchCheckBeforePutMessageHook,
52+
check_before_put_message::CheckBeforePutMessageHook,
53+
},
5054
offset::manager::{
5155
consumer_offset_manager::ConsumerOffsetManager,
5256
consumer_order_info_manager::ConsumerOrderInfoManager,
@@ -260,6 +264,7 @@ impl BrokerRuntime {
260264
unimplemented!()
261265
}
262266
if self.message_store.is_some() {
267+
self.register_message_store_hook();
263268
self.message_store.as_mut().unwrap().load().await;
264269
}
265270

@@ -279,6 +284,18 @@ impl BrokerRuntime {
279284
result
280285
}
281286

287+
pub fn register_message_store_hook(&mut self) {
288+
if let Some(ref mut message_store) = self.message_store {
289+
message_store.set_put_message_hook(Box::new(CheckBeforePutMessageHook::new(
290+
message_store.clone(),
291+
self.message_store_config.clone(),
292+
)));
293+
message_store.set_put_message_hook(Box::new(BatchCheckBeforePutMessageHook::new(
294+
self.topic_config_manager.topic_config_table(),
295+
)));
296+
}
297+
}
298+
282299
fn initialize_remoting_server(&mut self) {
283300

284301
// fast broker server implementation in future versions

rocketmq-broker/src/hook.rs

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
pub(crate) mod batch_check_before_put_message;
18+
pub(crate) mod check_before_put_message;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::{collections::HashMap, sync::Arc};
18+
19+
use rocketmq_common::common::{config::TopicConfig, message::message_single::MessageExt};
20+
use rocketmq_store::{
21+
base::message_result::PutMessageResult, hook::put_message_hook::PutMessageHook,
22+
};
23+
24+
use crate::util::hook_utils::HookUtils;
25+
26+
pub struct BatchCheckBeforePutMessageHook {
27+
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
28+
}
29+
30+
impl BatchCheckBeforePutMessageHook {
31+
pub fn new(topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>) -> Self {
32+
Self { topic_config_table }
33+
}
34+
}
35+
36+
impl PutMessageHook for BatchCheckBeforePutMessageHook {
37+
fn hook_name(&self) -> String {
38+
"batchCheckBeforePutMessage".to_string()
39+
}
40+
41+
fn execute_before_put_message(&self, msg: &MessageExt) -> Option<PutMessageResult> {
42+
HookUtils::check_inner_batch(&self.topic_config_table, msg)
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::sync::Arc;
18+
19+
use rocketmq_common::common::message::message_single::MessageExt;
20+
use rocketmq_store::{
21+
base::message_result::PutMessageResult, config::message_store_config::MessageStoreConfig,
22+
hook::put_message_hook::PutMessageHook, log_file::MessageStore,
23+
};
24+
25+
use crate::util::hook_utils::HookUtils;
26+
27+
pub struct CheckBeforePutMessageHook<MS> {
28+
message_store: MS,
29+
message_store_config: Arc<MessageStoreConfig>,
30+
}
31+
32+
impl<MS: MessageStore> CheckBeforePutMessageHook<MS> {
33+
pub fn new(message_store: MS, message_store_config: Arc<MessageStoreConfig>) -> Self {
34+
Self {
35+
message_store,
36+
message_store_config,
37+
}
38+
}
39+
}
40+
41+
impl<MS: MessageStore> PutMessageHook for CheckBeforePutMessageHook<MS> {
42+
fn hook_name(&self) -> String {
43+
"checkBeforePutMessage".to_string()
44+
}
45+
46+
fn execute_before_put_message(&self, msg: &MessageExt) -> Option<PutMessageResult> {
47+
HookUtils::check_before_put_message(&self.message_store, &self.message_store_config, msg)
48+
}
49+
}

rocketmq-broker/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ mod client;
2929
mod coldctr;
3030
mod controller;
3131
mod filter;
32+
mod hook;
3233
mod longpolling;
3334
mod mqtrace;
3435
mod offset;

rocketmq-broker/src/util/hook_utils.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,11 @@ impl HookUtils {
132132
PutMessageStatus::MessageIllegal,
133133
));
134134
}
135-
/*if message_store.is_os_page_cache_busy() {
136-
return return Some(PutMessageResult::new_default(
135+
if message_store.is_os_page_cache_busy() {
136+
return Some(PutMessageResult::new_default(
137137
PutMessageStatus::OsPageCacheBusy,
138138
));
139-
}*/
139+
}
140140

141141
None
142142
}
@@ -399,10 +399,12 @@ impl HookUtils {
399399
mod tests {
400400
use std::{collections::HashMap, error::Error, sync::Arc};
401401

402+
use parking_lot::RwLock;
402403
use rocketmq_common::common::{config::TopicConfig, message::message_single::MessageExt};
403404
use rocketmq_store::{
404405
base::{message_result::PutMessageResult, message_status_enum::PutMessageStatus},
405406
config::message_store_config::MessageStoreConfig,
407+
hook::put_message_hook::BoxedPutMessageHook,
406408
log_file::MessageStore,
407409
store::running_flags::RunningFlags,
408410
};
@@ -464,6 +466,14 @@ mod tests {
464466
fn is_shutdown(&self) -> bool {
465467
false
466468
}
469+
470+
fn get_put_message_hook_list(&self) -> Arc<RwLock<Vec<BoxedPutMessageHook>>> {
471+
todo!()
472+
}
473+
474+
fn set_put_message_hook(&self, put_message_hook: BoxedPutMessageHook) {
475+
todo!()
476+
}
467477
// Implement required methods...
468478
}
469479

rocketmq-store/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,5 @@ log = "0.4.20"
6060
memmap2 = "0.9.4"
6161
trait-variant.workspace = true
6262
[dev-dependencies]
63-
tempfile = "3.10.0"
63+
tempfile = "3.10.0"
64+
mockall = { workspace = true }

rocketmq-store/src/config/message_store_config.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ pub struct MessageStoreConfig {
128128
pub debug_lock_enable: bool,
129129
pub duplication_enable: bool,
130130
pub disk_fall_recorded: bool,
131-
pub os_page_cache_busy_timeout_mills: usize,
131+
pub os_page_cache_busy_timeout_mills: u64,
132132
pub default_query_max_num: usize,
133133
pub transient_store_pool_enable: bool,
134134
pub transient_store_pool_size: usize,
@@ -311,7 +311,7 @@ impl Default for MessageStoreConfig {
311311
debug_lock_enable: false,
312312
duplication_enable: false,
313313
disk_fall_recorded: false,
314-
os_page_cache_busy_timeout_mills: 0,
314+
os_page_cache_busy_timeout_mills: 1000,
315315
default_query_max_num: 0,
316316
transient_store_pool_enable: false,
317317
transient_store_pool_size: 0,

rocketmq-store/src/log_file.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,21 @@
1515
* limitations under the License.
1616
*/
1717

18+
use std::sync::Arc;
19+
1820
use rocketmq_common::{
1921
common::message::message_single::MessageExtBrokerInner, TimeUtils::get_current_millis,
2022
};
2123

22-
use crate::{base::message_result::PutMessageResult, store::running_flags::RunningFlags};
24+
use crate::{
25+
base::message_result::PutMessageResult, hook::put_message_hook::BoxedPutMessageHook,
26+
store::running_flags::RunningFlags,
27+
};
2328

2429
pub mod commit_log;
2530
pub mod flush_manager_impl;
2631
pub mod mapped_file;
2732

28-
#[allow(async_fn_in_trait)]
2933
#[trait_variant::make(MessageStore: Send)]
3034
pub trait RocketMQMessageStore: Clone + 'static {
3135
/// Load previously stored messages.
@@ -65,4 +69,8 @@ pub trait RocketMQMessageStore: Clone + 'static {
6569
fn get_running_flags(&self) -> &RunningFlags;
6670

6771
fn is_shutdown(&self) -> bool;
72+
73+
fn get_put_message_hook_list(&self) -> Arc<parking_lot::RwLock<Vec<BoxedPutMessageHook>>>;
74+
75+
fn set_put_message_hook(&self, put_message_hook: BoxedPutMessageHook);
6876
}

rocketmq-store/src/log_file/commit_log.rs

+32-7
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
* limitations under the License.
1616
*/
1717

18-
use std::{cell::Cell, collections::HashMap, mem, sync::Arc};
18+
use std::{
19+
cell::Cell,
20+
collections::HashMap,
21+
mem,
22+
sync::{atomic::AtomicU64, Arc},
23+
};
1924

2025
use bytes::{Buf, Bytes, BytesMut};
2126
use rocketmq_common::{
@@ -167,6 +172,7 @@ pub struct CommitLog {
167172
consume_queue_store: ConsumeQueueStore,
168173
flush_manager: Arc<tokio::sync::Mutex<DefaultFlushManager>>,
169174
//flush_manager: Arc<parking_lot::Mutex<DefaultFlushManager>>,
175+
begin_time_in_lock: Arc<AtomicU64>,
170176
}
171177

172178
impl CommitLog {
@@ -203,11 +209,7 @@ impl CommitLog {
203209
mapped_file_queue,
204210
store_checkpoint,
205211
))),
206-
/*flush_manager: Arc::new(parking_lot::Mutex::new(DefaultFlushManager::new(
207-
message_store_config,
208-
mapped_file_queue,
209-
store_checkpoint,
210-
))),*/
212+
begin_time_in_lock: Arc::new(AtomicU64::new(0)),
211213
}
212214
}
213215
}
@@ -319,6 +321,10 @@ impl CommitLog {
319321
msg.encoded_buff = Some(encoded_buff);
320322
let put_message_context = PutMessageContext::new(topic_queue_key);
321323
let lock = self.put_message_lock.lock().await;
324+
self.begin_time_in_lock.store(
325+
time_utils::get_current_millis(),
326+
std::sync::atomic::Ordering::Release,
327+
);
322328
let start_time = Instant::now();
323329
// Here settings are stored timestamp, in order to ensure an orderly global
324330
if !self.message_store_config.duplication_enable {
@@ -333,6 +339,13 @@ impl CommitLog {
333339

334340
if mapped_file.is_none() {
335341
drop(lock);
342+
error!(
343+
"create mapped file error, topic: {} clientAddr: {}",
344+
msg.topic(),
345+
msg.born_host()
346+
);
347+
self.begin_time_in_lock
348+
.store(0, std::sync::atomic::Ordering::Release);
336349
return PutMessageResult::new_default(PutMessageStatus::CreateMappedFileFailed);
337350
}
338351

@@ -353,6 +366,8 @@ impl CommitLog {
353366
.mapped_file_queue
354367
.get_last_mapped_file_mut_start_offset(0, true);
355368
if mapped_file.is_none() {
369+
self.begin_time_in_lock
370+
.store(0, std::sync::atomic::Ordering::Release);
356371
error!(
357372
"create mapped file error, topic: {} clientAddr: {}",
358373
msg.topic(),
@@ -379,15 +394,21 @@ impl CommitLog {
379394
}
380395
AppendMessageStatus::MessageSizeExceeded
381396
| AppendMessageStatus::PropertiesSizeExceeded => {
397+
self.begin_time_in_lock
398+
.store(0, std::sync::atomic::Ordering::Release);
382399
PutMessageResult::new_append_result(PutMessageStatus::MessageIllegal, Some(result))
383400
}
384401
AppendMessageStatus::UnknownError => {
402+
self.begin_time_in_lock
403+
.store(0, std::sync::atomic::Ordering::Release);
385404
PutMessageResult::new_append_result(PutMessageStatus::UnknownError, Some(result))
386405
}
387406
};
388407
let elapsed_time_in_lock = start_time.elapsed().as_millis() as u64;
389408
drop(lock);
390-
if elapsed_time_in_lock > 100 {
409+
self.begin_time_in_lock
410+
.store(0, std::sync::atomic::Ordering::Release);
411+
if elapsed_time_in_lock > 500 {
391412
warn!(
392413
"[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} \
393414
AppendMessageResult={:?}",
@@ -873,6 +894,10 @@ impl CommitLog {
873894
pub fn check_self(&self) {
874895
self.mapped_file_queue.check_self();
875896
}
897+
898+
pub fn begin_time_in_lock(&self) -> &Arc<AtomicU64> {
899+
&self.begin_time_in_lock
900+
}
876901
}
877902

878903
pub fn check_message_and_return_size(

0 commit comments

Comments
 (0)