Skip to content

Commit 74cd1fb

Browse files
committed
[ISSUE #353]Add Broker shutdown logic
1 parent 834542d commit 74cd1fb

File tree

13 files changed

+442
-13
lines changed

13 files changed

+442
-13
lines changed

rocketmq-broker/src/broker.rs

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
pub mod broker_hook;
+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::sync::Arc;
19+
20+
pub trait ShutdownHook {
21+
fn before_shutdown(&self);
22+
}
23+
24+
pub type BrokerShutdownHook = Arc<dyn ShutdownHook + Send + Sync + 'static>;

rocketmq-broker/src/broker_runtime.rs

+24-7
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use rocketmq_store::{
4444
use tracing::{info, warn};
4545

4646
use crate::{
47+
broker::broker_hook::BrokerShutdownHook,
4748
client::manager::producer_manager::ProducerManager,
4849
filter::manager::consumer_filter_manager::ConsumerFilterManager,
4950
offset::manager::{
@@ -84,6 +85,8 @@ pub(crate) struct BrokerRuntime {
8485
broker_runtime: Option<RocketMQRuntime>,
8586
producer_manager: Arc<ProducerManager>,
8687
drop: Arc<AtomicBool>,
88+
shutdown: Arc<AtomicBool>,
89+
shutdown_hook: Option<BrokerShutdownHook>,
8790
}
8891

8992
impl Clone for BrokerRuntime {
@@ -105,6 +108,8 @@ impl Clone for BrokerRuntime {
105108
broker_runtime: None,
106109
producer_manager: self.producer_manager.clone(),
107110
drop: self.drop.clone(),
111+
shutdown: self.shutdown.clone(),
112+
shutdown_hook: self.shutdown_hook.clone(),
108113
}
109114
}
110115
}
@@ -151,6 +156,8 @@ impl BrokerRuntime {
151156
broker_runtime: Some(runtime),
152157
producer_manager: Arc::new(ProducerManager::new()),
153158
drop: Arc::new(AtomicBool::new(false)),
159+
shutdown: Arc::new(AtomicBool::new(false)),
160+
shutdown_hook: None,
154161
}
155162
}
156163

@@ -161,6 +168,22 @@ impl BrokerRuntime {
161168
pub(crate) fn message_store_config(&self) -> &MessageStoreConfig {
162169
&self.message_store_config
163170
}
171+
172+
pub fn shutdown(&mut self) {
173+
self.broker_out_api.shutdown();
174+
self.message_store.as_mut().unwrap().shutdown();
175+
if let Some(runtime) = self.broker_runtime.take() {
176+
runtime.shutdown();
177+
}
178+
}
179+
180+
pub(crate) fn shutdown_basic_service(&mut self) {
181+
self.shutdown.store(true, Ordering::SeqCst);
182+
183+
if let Some(hook) = self.shutdown_hook.as_ref() {
184+
hook.before_shutdown();
185+
}
186+
}
164187
}
165188

166189
impl Drop for BrokerRuntime {
@@ -170,14 +193,8 @@ impl Drop for BrokerRuntime {
170193
.clone()
171194
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed);
172195
if result.is_ok() {
173-
if let Some(runtime) = self.broker_runtime.take() {
174-
runtime.shutdown();
175-
}
196+
self.shutdown();
176197
}
177-
178-
/*if let Some(runtime) = self.broker_runtime.take() {
179-
runtime.shutdown();
180-
}*/
181198
}
182199
}
183200

rocketmq-broker/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub use broker_bootstrap::{BrokerBootstrap, Builder};
2121

2222
pub mod command;
2323

24+
mod broker;
2425
mod broker_bootstrap;
2526
mod broker_path_config_helper;
2627
mod broker_runtime;

rocketmq-broker/src/out_api/broker_outer_api.rs

+2
Original file line numberDiff line numberDiff line change
@@ -220,4 +220,6 @@ impl BrokerOuterAPI {
220220
let _result = tokio::join!(handle);
221221
}
222222
}
223+
224+
pub fn shutdown(&self) {}
223225
}

rocketmq-store/src/consume_queue/mapped_file_queue.rs

-2
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,6 @@ impl MappedFileQueue {
265265

266266
#[cfg(test)]
267267
mod tests {
268-
use std::fs;
269-
270268
use super::*;
271269

272270
#[test]

rocketmq-store/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,6 @@ pub mod message_store;
3030
mod queue;
3131
pub(crate) mod services;
3232
pub mod status;
33+
mod store;
3334
pub mod store_path_config_helper;
3435
pub mod timer;

rocketmq-store/src/log_file.rs

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ pub trait RocketMQMessageStore: Clone {
3939
/// Throws an `Exception` if there is any error.
4040
fn start(&mut self) -> Result<(), Box<dyn std::error::Error>>;
4141

42+
fn shutdown(&mut self);
43+
4244
fn set_confirm_offset(&mut self, phy_offset: i64);
4345

4446
fn get_max_phy_offset(&self) -> i64;

rocketmq-store/src/log_file/commit_log.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ impl CommitLog {
127127
result
128128
}
129129

130+
pub fn shutdown(&mut self) {}
131+
132+
pub fn destroy(&mut self) {}
130133
/* pub fn set_local_file_message_store(
131134
&mut self,
132135
local_file_message_store: Weak<Mutex<LocalFileMessageStore>>,
@@ -280,7 +283,6 @@ impl CommitLog {
280283
true,
281284
&message_store_config,
282285
);
283-
println!("{}", dispatch_request);
284286
current_pos += size;
285287
if dispatch_request.success && dispatch_request.msg_size > 0 {
286288
last_valid_msg_phy_offset = process_offset + mapped_file_offset;
@@ -402,7 +404,6 @@ fn check_message_and_return_size(
402404
read_body: bool,
403405
message_store_config: &Arc<MessageStoreConfig>,
404406
) -> DispatchRequest {
405-
info!("check_message_and_return_size");
406407
let total_size = bytes.get_i32();
407408
let magic_code = bytes.get_i32();
408409
if magic_code == MESSAGE_MAGIC_CODE || magic_code == MESSAGE_MAGIC_CODE_V2 {

rocketmq-store/src/message_store/default_message_store.rs

+34-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ use std::{
2020
collections::HashMap,
2121
error::Error,
2222
fs,
23+
path::PathBuf,
2324
sync::{
24-
atomic::{AtomicI64, Ordering},
25+
atomic::{AtomicBool, AtomicI64, Ordering},
2526
Arc,
2627
},
2728
time::Instant,
@@ -57,6 +58,7 @@ use crate::{
5758
build_consume_queue::CommitLogDispatcherBuildConsumeQueue,
5859
local_file_consume_queue_store::ConsumeQueueStore, ConsumeQueueStoreTrait,
5960
},
61+
store::running_flags::RunningFlags,
6062
store_path_config_helper::{get_abort_file, get_store_checkpoint},
6163
};
6264

@@ -77,6 +79,8 @@ pub struct DefaultMessageStore {
7779
dispatcher: CommitLogDispatcherDefault,
7880
broker_init_max_offset: Arc<AtomicI64>,
7981
state_machine_version: Arc<AtomicI64>,
82+
shutdown: Arc<AtomicBool>,
83+
running_flags: Arc<RunningFlags>,
8084
}
8185

8286
impl Clone for DefaultMessageStore {
@@ -96,6 +100,8 @@ impl Clone for DefaultMessageStore {
96100
dispatcher: self.dispatcher.clone(),
97101
broker_init_max_offset: self.broker_init_max_offset.clone(),
98102
state_machine_version: self.state_machine_version.clone(),
103+
shutdown: self.shutdown.clone(),
104+
running_flags: self.running_flags.clone(),
99105
}
100106
}
101107
}
@@ -141,6 +147,8 @@ impl DefaultMessageStore {
141147
dispatcher,
142148
broker_init_max_offset: Arc::new(AtomicI64::new(-1)),
143149
state_machine_version: Arc::new(AtomicI64::new(0)),
150+
shutdown: Arc::new(AtomicBool::new(false)),
151+
running_flags: Arc::new(RunningFlags::new()),
144152
}
145153
}
146154
}
@@ -268,6 +276,17 @@ impl DefaultMessageStore {
268276
pub fn consume_queue_store_mut(&mut self) -> &mut ConsumeQueueStore {
269277
&mut self.consume_queue_store
270278
}
279+
280+
fn delete_file(&mut self, file_name: String) {
281+
match fs::remove_file(PathBuf::from(file_name.as_str())) {
282+
Ok(_) => {
283+
info!("delete OK, file:{}", file_name);
284+
}
285+
Err(err) => {
286+
error!("delete error, file:{}, {:?}", file_name, err);
287+
}
288+
}
289+
}
271290
}
272291

273292
impl MessageStore for DefaultMessageStore {
@@ -328,6 +347,20 @@ impl MessageStore for DefaultMessageStore {
328347
Ok(())
329348
}
330349

350+
fn shutdown(&mut self) {
351+
if !self.shutdown.load(Ordering::Relaxed) {
352+
self.shutdown.store(true, Ordering::SeqCst);
353+
self.commit_log.shutdown();
354+
355+
if self.running_flags.is_writeable() {
356+
//delete abort file
357+
self.delete_file(get_abort_file(
358+
self.message_store_config.store_path_root_dir.as_str(),
359+
))
360+
}
361+
}
362+
}
363+
331364
fn set_confirm_offset(&mut self, phy_offset: i64) {
332365

333366
// self.commit_log.set_confirm_offset(phy_offset);

rocketmq-store/src/store.rs

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
pub(crate) mod running_flags;

0 commit comments

Comments
 (0)