Skip to content

Commit fa79f26

Browse files
authored
[ISSUE #334]🚧Optimize commitlog recover-2 (#335)
1 parent 523ce23 commit fa79f26

20 files changed

+1416
-636
lines changed

rocketmq-cli/src/content_show.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub fn print_content(from: Option<u32>, to: Option<u32>, path: Option<PathBuf>)
2929
let path_buf = path.unwrap().into_os_string();
3030
let file_metadata = fs::metadata(path_buf.clone()).unwrap();
3131
println!("file size: {}B", file_metadata.len());
32-
let mut mapped_file = LocalMappedFile::new(
32+
let mapped_file = LocalMappedFile::new(
3333
path_buf.to_os_string().to_string_lossy().to_string(),
3434
file_metadata.len(),
3535
);

rocketmq-common/src/common/attribute/cq_type.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::str::FromStr;
1919

2020
use anyhow::anyhow;
2121

22-
#[derive(PartialEq, Default, Debug)]
22+
#[derive(PartialEq, Default, Debug, Copy, Clone)]
2323
pub enum CQType {
2424
#[default]
2525
SimpleCQ,

rocketmq-common/src/common/broker/broker_config.rs

+2
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ pub struct BrokerConfig {
110110
pub is_in_broker_container: bool,
111111
pub commercial_size_per_msg: i32,
112112
pub recover_concurrently: bool,
113+
pub duplication_enable: bool,
113114
}
114115

115116
impl Default for BrokerConfig {
@@ -148,6 +149,7 @@ impl Default for BrokerConfig {
148149
is_in_broker_container: false,
149150
commercial_size_per_msg: 4 * 1024,
150151
recover_concurrently: false,
152+
duplication_enable: false,
151153
}
152154
}
153155
}

rocketmq-store/src/base/commit_log_dispatcher.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
use crate::base::dispatch_request::DispatchRequest;
1919

20-
#[trait_variant::make(CommitLogDispatcher:Send)]
21-
pub trait RocketMQCommitLogDispatcher: Clone {
22-
async fn dispatch(&mut self, dispatch_request: &DispatchRequest);
20+
pub trait CommitLogDispatcher: Send + Clone {
21+
fn dispatch(&mut self, dispatch_request: &DispatchRequest);
2322
}

rocketmq-store/src/base/dispatch_request.rs

+36-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18-
use std::collections::HashMap;
18+
use std::{
19+
collections::HashMap,
20+
fmt::{Display, Formatter},
21+
};
1922

2023
#[derive(Debug, Default)]
2124
pub struct DispatchRequest {
@@ -39,3 +42,35 @@ pub struct DispatchRequest {
3942
pub next_reput_from_offset: i64,
4043
pub offset_id: String,
4144
}
45+
46+
impl Display for DispatchRequest {
47+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48+
write!(
49+
f,
50+
"DispatchRequest {{ topic: {}, queue_id: {}, commit_log_offset: {}, msg_size: {}, \
51+
tags_code: {}, store_timestamp: {}, consume_queue_offset: {}, keys: {}, success: {}, \
52+
uniq_key: {:?}, sys_flag: {}, prepared_transaction_offset: {}, properties_map: {:?}, \
53+
bit_map: {:?}, buffer_size: {}, msg_base_offset: {}, batch_size: {}, \
54+
next_reput_from_offset: {}, offset_id: {} }}",
55+
self.topic,
56+
self.queue_id,
57+
self.commit_log_offset,
58+
self.msg_size,
59+
self.tags_code,
60+
self.store_timestamp,
61+
self.consume_queue_offset,
62+
self.keys,
63+
self.success,
64+
self.uniq_key,
65+
self.sys_flag,
66+
self.prepared_transaction_offset,
67+
self.properties_map,
68+
self.bit_map,
69+
self.buffer_size,
70+
self.msg_base_offset,
71+
self.batch_size,
72+
self.next_reput_from_offset,
73+
self.offset_id
74+
)
75+
}
76+
}

rocketmq-store/src/config/message_store_config.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use serde::Deserialize;
2323
use crate::{
2424
base::store_enum::StoreType,
2525
config::{broker_role::BrokerRole, flush_disk_type::FlushDiskType},
26+
queue::single_consume_queue::CQ_STORE_UNIT_SIZE,
2627
};
2728

2829
lazy_static! {
@@ -251,9 +252,9 @@ impl Default for MessageStoreConfig {
251252
store_type: Default::default(),
252253
mapped_file_size_consume_queue: 0,
253254
enable_consume_queue_ext: false,
254-
mapped_file_size_consume_queue_ext: 0,
255+
mapped_file_size_consume_queue_ext: 48 * 1024 * 1024,
255256
mapper_file_size_batch_consume_queue: 0,
256-
bit_map_length_consume_queue_ext: 0,
257+
bit_map_length_consume_queue_ext: 64,
257258
flush_interval_commit_log: 0,
258259
commit_interval_commit_log: 0,
259260
max_recovery_commit_log_files: 0,
@@ -404,4 +405,10 @@ impl MessageStoreConfig {
404405
pub fn is_enable_rocksdb_store(&self) -> bool {
405406
self.store_type == StoreType::RocksDB
406407
}
408+
409+
pub fn get_mapped_file_size_consume_queue(&self) -> i32 {
410+
let factor = (self.mapped_file_size_consume_queue as f64 / (CQ_STORE_UNIT_SIZE as f64))
411+
.ceil() as i32;
412+
factor * CQ_STORE_UNIT_SIZE
413+
}
407414
}

rocketmq-store/src/consume_queue/mapped_file_queue.rs

+39-14
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use std::{
2323

2424
use log::warn;
2525
use rocketmq_common::UtilAll::offset_to_file_name;
26-
use tokio::sync::Mutex;
2726
use tracing::info;
2827

2928
use crate::{
@@ -37,7 +36,8 @@ pub struct MappedFileQueue {
3736

3837
pub(crate) mapped_file_size: u64,
3938
//pub(crate) mapped_files: Arc<Mutex<Vec<LocalMappedFile>>>,
40-
pub(crate) mapped_files: Vec<Arc<Mutex<LocalMappedFile>>>,
39+
//pub(crate) mapped_files: Vec<Arc<Mutex<LocalMappedFile>>>,
40+
pub(crate) mapped_files: Vec<Arc<LocalMappedFile>>,
4141
// pub(crate) mapped_files: Vec<LocalMappedFile>,
4242
pub(crate) allocate_mapped_file_service: Option<AllocateMappedFileService>,
4343

@@ -130,7 +130,7 @@ impl MappedFileQueue {
130130
LocalMappedFile::new(file.to_string_lossy().to_string(), self.mapped_file_size);
131131
// Set wrote, flushed, committed positions for mapped_file
132132

133-
self.mapped_files.push(Arc::new(Mutex::new(mapped_file)));
133+
self.mapped_files.push(Arc::new(mapped_file));
134134
// self.mapped_files
135135
// .push(mapped_file);
136136
info!("load {} OK", file.display());
@@ -153,7 +153,7 @@ impl MappedFileQueue {
153153
// self.mapped_files.last()
154154
// }
155155

156-
pub fn get_last_mapped_file(&self) -> Option<Arc<Mutex<LocalMappedFile>>> {
156+
pub fn get_last_mapped_file(&self) -> Option<Arc<LocalMappedFile>> {
157157
if self.mapped_files.is_empty() {
158158
return None;
159159
}
@@ -164,7 +164,7 @@ impl MappedFileQueue {
164164
&mut self,
165165
start_offset: u64,
166166
need_create: bool,
167-
) -> Option<Arc<Mutex<LocalMappedFile>>> {
167+
) -> Option<Arc<LocalMappedFile>> {
168168
let mut create_offset = -1i64;
169169
let file_size = self.mapped_file_size as i64;
170170
let mapped_file_last = self.get_last_mapped_file();
@@ -173,8 +173,8 @@ impl MappedFileQueue {
173173
create_offset = start_offset as i64 - (start_offset as i64 % file_size);
174174
}
175175
Some(ref value) => {
176-
if value.lock().await.is_full() {
177-
create_offset = value.lock().await.get_file_from_offset() as i64 + file_size
176+
if value.is_full() {
177+
create_offset = value.get_file_from_offset() as i64 + file_size
178178
}
179179
}
180180
}
@@ -184,10 +184,7 @@ impl MappedFileQueue {
184184
mapped_file_last
185185
}
186186

187-
pub fn try_create_mapped_file(
188-
&mut self,
189-
create_offset: u64,
190-
) -> Option<Arc<Mutex<LocalMappedFile>>> {
187+
pub fn try_create_mapped_file(&mut self, create_offset: u64) -> Option<Arc<LocalMappedFile>> {
191188
let next_file_path =
192189
PathBuf::from(self.store_path.clone()).join(offset_to_file_name(create_offset));
193190
let next_next_file_path = PathBuf::from(self.store_path.clone())
@@ -199,7 +196,7 @@ impl MappedFileQueue {
199196
&mut self,
200197
next_file_path: PathBuf,
201198
_next_next_file_path: PathBuf,
202-
) -> Option<Arc<Mutex<LocalMappedFile>>> {
199+
) -> Option<Arc<LocalMappedFile>> {
203200
let mut mapped_file = match self.allocate_mapped_file_service {
204201
None => LocalMappedFile::new(
205202
next_file_path.to_string_lossy().to_string(),
@@ -213,12 +210,12 @@ impl MappedFileQueue {
213210
if self.mapped_files.is_empty() {
214211
mapped_file.set_first_create_in_queue(true);
215212
}
216-
let inner = Arc::new(Mutex::new(mapped_file));
213+
let inner = Arc::new(mapped_file);
217214
self.mapped_files.push(inner.clone());
218215
Some(inner)
219216
}
220217

221-
pub fn get_mapped_files(&self) -> Vec<Arc<Mutex<LocalMappedFile>>> {
218+
pub fn get_mapped_files(&self) -> Vec<Arc<LocalMappedFile>> {
222219
self.mapped_files.to_vec()
223220
}
224221

@@ -231,6 +228,34 @@ impl MappedFileQueue {
231228
}
232229

233230
pub fn truncate_dirty_files(&mut self, offset: i64) {}
231+
232+
pub fn get_max_offset(&self) -> i64 {
233+
/*let handle = Handle::current();
234+
let mapped_file = self.get_last_mapped_file();
235+
std::thread::spawn(move || {
236+
handle.block_on(async move {
237+
match mapped_file {
238+
None => 0,
239+
Some(value) => {
240+
let file = value.lock().await;
241+
file.get_file_from_offset() as i64 + file.get_read_position() as i64
242+
}
243+
}
244+
})
245+
})
246+
.join()
247+
.unwrap()*/
248+
match self.get_last_mapped_file() {
249+
None => 0,
250+
Some(file) => file.get_file_from_offset() as i64 + file.get_read_position() as i64,
251+
}
252+
}
253+
254+
pub fn delete_last_mapped_file(&self) {
255+
unimplemented!()
256+
}
257+
258+
pub(crate) fn delete_expired_file(&self, files: Vec<Option<Arc<LocalMappedFile>>>) {}
234259
}
235260

236261
#[cfg(test)]

rocketmq-store/src/index/index_dispatch.rs

+4-12
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
use std::sync::Arc;
1919

20-
use tokio::sync::Mutex;
21-
2220
use crate::{
2321
base::{commit_log_dispatcher::CommitLogDispatcher, dispatch_request::DispatchRequest},
2422
config::message_store_config::MessageStoreConfig,
@@ -27,15 +25,12 @@ use crate::{
2725

2826
#[derive(Clone)]
2927
pub struct CommitLogDispatcherBuildIndex {
30-
index_service: Arc<Mutex<IndexService>>,
28+
index_service: IndexService,
3129
message_store_config: Arc<MessageStoreConfig>,
3230
}
3331

3432
impl CommitLogDispatcherBuildIndex {
35-
pub fn new(
36-
index_service: Arc<Mutex<IndexService>>,
37-
message_store_config: Arc<MessageStoreConfig>,
38-
) -> Self {
33+
pub fn new(index_service: IndexService, message_store_config: Arc<MessageStoreConfig>) -> Self {
3934
Self {
4035
index_service,
4136
message_store_config,
@@ -44,12 +39,9 @@ impl CommitLogDispatcherBuildIndex {
4439
}
4540

4641
impl CommitLogDispatcher for CommitLogDispatcherBuildIndex {
47-
async fn dispatch(&mut self, dispatch_request: &DispatchRequest) {
42+
fn dispatch(&mut self, dispatch_request: &DispatchRequest) {
4843
if self.message_store_config.message_index_enable {
49-
self.index_service
50-
.lock()
51-
.await
52-
.build_index(dispatch_request);
44+
self.index_service.build_index(dispatch_request);
5345
}
5446
}
5547
}

0 commit comments

Comments
 (0)