Skip to content

Commit 288297f

Browse files
authored
[ISSUE #144]🎉Add the basic code for local file in the store module (#145)
* [ISSUE #144]🎉Add the basic code for local file in the store module * fix ci error
1 parent b47a106 commit 288297f

24 files changed

+911
-38
lines changed

‎rocketmq-common/src/common.rs

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ pub mod mq_version;
2828
pub mod namesrv;
2929
mod sys_flag;
3030
pub use crate::common::sys_flag::topic_sys_flag as TopicSysFlag;
31+
pub mod attribute;
32+
pub mod boundary_type;
3133
pub mod broker;
3234
pub mod message;
3335
pub mod topic;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
pub mod attribute_enum;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
pub enum CQType {
18+
SimpleCQ,
19+
BatchCQ,
20+
RocksDBCQ,
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#[derive(Debug, PartialEq, Eq)]
18+
pub enum BoundaryType {
19+
Lower,
20+
Upper,
21+
}
22+
23+
impl BoundaryType {
24+
pub fn get_name(&self) -> &'static str {
25+
match self {
26+
BoundaryType::Lower => "lower",
27+
BoundaryType::Upper => "upper",
28+
}
29+
}
30+
31+
pub fn get_type(name: &str) -> Option<BoundaryType> {
32+
match name.to_lowercase().as_str() {
33+
"lower" => Some(BoundaryType::Lower),
34+
"upper" => Some(BoundaryType::Upper),
35+
_ => None,
36+
}
37+
}
38+
}

‎rocketmq-store/Cargo.toml

+33-3
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,45 @@ readme.workspace = true
1111
description.workspace = true
1212

1313
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
14+
[features]
15+
default = ["local_file_store"]
16+
local_file_store = []
17+
data_store = ["local_file_store"]
18+
1419

1520
[dependencies]
1621
rocketmq-common = {version = "0.2.0", path = "../rocketmq-common"}
1722

18-
lazy_static.workspace = true
19-
2023
#tools
2124
dirs.workspace = true
2225

26+
parking_lot.workspace = true
27+
28+
anyhow.workspace = true
29+
bytes.workspace = true
30+
thiserror.workspace = true
31+
32+
#tokio
33+
tokio.workspace = true
34+
tokio-util.workspace = true
35+
tokio-stream.workspace = true
36+
37+
#log
38+
tracing.workspace = true
39+
tracing-subscriber.workspace = true
40+
2341
#json spupport
2442
serde.workspace = true
25-
serde_json.workspace = true
43+
serde_json.workspace = true
44+
45+
rand.workspace = true
46+
lazy_static.workspace = true
47+
48+
#futures
49+
futures = "0.3"
50+
futures-util = "0.3"
51+
futures-core = "0.3"
52+
futures-sink = "0.3"
53+
futures-io = "0.3"
54+
55+
num_cpus.workspace = true

‎rocketmq-store/src/base.rs

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
* limitations under the License.
1616
*/
1717

18+
pub(crate) mod dispatch_request;
1819
pub mod message_result;
1920
pub mod message_status_enum;
2021
pub mod select_result;
22+
pub mod swappable;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
pub struct DispatchRequest {}
+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/// Clean up page-table on super large disk.
19+
pub trait Swappable {
20+
/// Swap map with specified parameters.
21+
///
22+
/// `reserve_num`: Number of reserved items.
23+
/// `force_swap_interval_ms`: Force swap interval in milliseconds.
24+
/// `normal_swap_interval_ms`: Normal swap interval in milliseconds.
25+
fn swap_map(&self, reserve_num: i32, force_swap_interval_ms: i64, normal_swap_interval_ms: i64);
26+
27+
/// Clean swapped map with specified force clean swap interval.
28+
///
29+
/// `force_clean_swap_interval_ms`: Force clean swap interval in milliseconds.
30+
fn clean_swapped_map(&self, force_clean_swap_interval_ms: i64);
31+
}

‎rocketmq-store/src/config.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@
1717

1818
mod broker_role;
1919
mod flush_disk_type;
20-
mod message_store_config;
21-
mod store_path_config_helper;
20+
pub mod message_store_config;
21+
pub(crate) mod store_path_config_helper;

‎rocketmq-store/src/config/broker_role.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
* limitations under the License.
1616
*/
1717
#[allow(dead_code)]
18-
#[derive(Debug, Copy, Clone)]
18+
#[derive(Debug, Copy, Clone, Default)]
1919
pub enum BrokerRole {
20+
#[default]
2021
AsyncMaster,
2122
SyncMaster,
2223
Slave,

‎rocketmq-store/src/config/flush_disk_type.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
* limitations under the License.
1616
*/
1717
#[allow(dead_code)]
18-
#[derive(Debug, Copy, Clone)]
18+
#[derive(Debug, Copy, Clone, Default)]
1919
pub enum FlushDiskType {
2020
SyncFlush,
21+
22+
#[default]
2123
AsyncFlush,
2224
}

‎rocketmq-store/src/consume_queue.rs

+1
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@
1616
*/
1717

1818
pub(crate) mod consume_queue_ext;
19+
pub(crate) mod mapped_file_queue;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use std::{fs, vec};
2+
3+
use tracing::{error, info};
4+
5+
use crate::{
6+
base::swappable::Swappable,
7+
log_file::mapped_file::{default_impl::DefaultMappedFile, MappedFile},
8+
};
9+
10+
pub struct MappedFileQueue {
11+
pub(crate) store_path: String,
12+
13+
pub(crate) mapped_file_size: u64,
14+
15+
pub(crate) mapped_files: Vec<Box<dyn MappedFile>>,
16+
17+
//AllocateMappedFileService -- todo
18+
pub(crate) flushed_where: u64,
19+
20+
pub(crate) committed_where: u64,
21+
22+
pub(crate) store_timestamp: u64,
23+
}
24+
25+
impl Swappable for MappedFileQueue {
26+
fn swap_map(
27+
&self,
28+
_reserve_num: i32,
29+
_force_swap_interval_ms: i64,
30+
_normal_swap_interval_ms: i64,
31+
) {
32+
todo!()
33+
}
34+
35+
fn clean_swapped_map(&self, _force_clean_swap_interval_ms: i64) {
36+
todo!()
37+
}
38+
}
39+
40+
impl MappedFileQueue {
41+
pub fn load(&mut self) -> bool {
42+
//list dir files
43+
match std::path::Path::new(&self.store_path).read_dir() {
44+
Ok(dir) => {
45+
let mut files = vec![];
46+
for file in dir {
47+
files.push(file.unwrap());
48+
}
49+
if files.is_empty() {
50+
return true;
51+
}
52+
self.do_load(files).unwrap()
53+
}
54+
Err(_) => false,
55+
}
56+
}
57+
58+
fn do_load(&mut self, files: Vec<fs::DirEntry>) -> anyhow::Result<bool> {
59+
// Ascending order sorting
60+
let sorted_files: Vec<_> = files.into_iter().collect();
61+
//sorted_files.sort_by(|a, b| a.file_name().cmp(&b.file_name()));
62+
63+
for (i, file) in sorted_files.iter().enumerate() {
64+
if file.path().is_dir() {
65+
continue;
66+
}
67+
68+
if file.metadata()?.len() == 0 && i == sorted_files.len() - 1 {
69+
fs::remove_file(file.path())?;
70+
error!("{} size is 0, auto delete.", file.path().display());
71+
continue;
72+
}
73+
74+
if file.metadata()?.len() != self.mapped_file_size {
75+
error!(
76+
"{} length not matched message store config value, please check it manually",
77+
file.path().display()
78+
);
79+
return Ok(false);
80+
}
81+
82+
let mapped_file = DefaultMappedFile::new(
83+
file.path().into_os_string().to_string_lossy().to_string(),
84+
self.mapped_file_size,
85+
);
86+
// Set wrote, flushed, committed positions for mapped_file
87+
88+
self.mapped_files.push(Box::new(mapped_file));
89+
info!("load {} OK", file.path().display());
90+
}
91+
92+
Ok(true)
93+
}
94+
}

‎rocketmq-store/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@ mod config;
2121
mod consume_queue;
2222
mod filter;
2323
mod log_file;
24+
mod message_store;
25+
mod queue;

0 commit comments

Comments
 (0)