Skip to content

Commit 3c68cd3

Browse files
authored
[ISSUE #218]📌Implement DefaultMappedFile initialization (#219)
* [ISSUE #218]📌Implement DefaultMappedFile initialization * fix ci error * fix test case error
1 parent 32fe9c9 commit 3c68cd3

10 files changed

+700
-10
lines changed

rocketmq-store/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,7 @@ num_cpus.workspace = true
5656
tempfile = "3.10.0"
5757
log = "0.4.20"
5858

59+
memmap2 = "0.9.4"
60+
5961
[dev-dependencies]
6062
tempfile = "3.10.0"

rocketmq-store/src/base.rs

+4
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,13 @@
1515
* limitations under the License.
1616
*/
1717

18+
pub mod append_message_callback;
19+
pub mod compaction_append_msg_callback;
1820
pub(crate) mod dispatch_request;
1921
pub mod message_result;
2022
pub mod message_status_enum;
23+
pub mod put_message_context;
2124
pub mod select_result;
2225
pub mod store_enum;
2326
pub mod swappable;
27+
pub mod transient_store_pool;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use rocketmq_common::common::message::{
18+
message_batch::MessageExtBatch, message_single::MessageExtBrokerInner,
19+
};
20+
21+
use crate::base::{message_result::AppendMessageResult, put_message_context::PutMessageContext};
22+
23+
/// Write messages callback interface
24+
pub trait AppendMessageCallback {
25+
/// After message serialization, write MappedByteBuffer
26+
///
27+
/// # Arguments
28+
///
29+
/// * `file_from_offset` - The offset of the file
30+
/// * `byte_buffer` - The buffer to write
31+
/// * `max_blank` - The maximum blank space
32+
/// * `msg` - The message to write
33+
/// * `put_message_context` - The context of putting message
34+
///
35+
/// # Returns
36+
///
37+
/// The number of bytes written
38+
fn do_append(
39+
&self,
40+
file_from_offset: i64,
41+
byte_buffer: &mut [u8],
42+
max_blank: i32,
43+
msg: &MessageExtBrokerInner,
44+
put_message_context: &PutMessageContext,
45+
) -> AppendMessageResult;
46+
47+
/// After batched message serialization, write MappedByteBuffer
48+
///
49+
/// # Arguments
50+
///
51+
/// * `file_from_offset` - The offset of the file
52+
/// * `byte_buffer` - The buffer to write
53+
/// * `max_blank` - The maximum blank space
54+
/// * `message_ext_batch` - The batched message to write
55+
/// * `put_message_context` - The context of putting message
56+
///
57+
/// # Returns
58+
///
59+
/// The number of bytes written
60+
fn do_append_batch(
61+
&self,
62+
file_from_offset: i64,
63+
byte_buffer: &mut [u8],
64+
max_blank: i32,
65+
message_ext_batch: &MessageExtBatch,
66+
put_message_context: &PutMessageContext,
67+
) -> AppendMessageResult;
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use crate::base::message_result::AppendMessageResult;
18+
19+
/// Callback interface for compaction append message
20+
pub trait CompactionAppendMsgCallback {
21+
/// Append messages during compaction
22+
///
23+
/// # Arguments
24+
///
25+
/// * `bb_dest` - The destination buffer to append to
26+
/// * `file_from_offset` - The offset of the file
27+
/// * `max_blank` - The maximum blank space
28+
/// * `bb_src` - The source buffer containing the message to be appended
29+
///
30+
/// # Returns
31+
///
32+
/// The result of the append operation
33+
fn do_append(
34+
&self,
35+
bb_dest: &mut bytes::Bytes,
36+
file_from_offset: i64,
37+
max_blank: i32,
38+
bb_src: &mut bytes::Bytes,
39+
) -> AppendMessageResult;
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#[derive(Debug, Clone, Default)]
18+
pub struct PutMessageContext {
19+
topic_queue_table_key: String,
20+
phy_pos: Vec<i64>,
21+
batch_size: i32,
22+
}
23+
24+
impl PutMessageContext {
25+
pub fn new(topic_queue_table_key: String) -> Self {
26+
PutMessageContext {
27+
topic_queue_table_key,
28+
phy_pos: Vec::new(),
29+
batch_size: 0,
30+
}
31+
}
32+
33+
pub fn get_topic_queue_table_key(&self) -> &str {
34+
&self.topic_queue_table_key
35+
}
36+
37+
pub fn get_phy_pos(&self) -> &[i64] {
38+
&self.phy_pos
39+
}
40+
41+
pub fn phy_pos(&mut self, phy_pos: Vec<i64>) {
42+
self.phy_pos = phy_pos;
43+
}
44+
45+
pub fn get_batch_size(&self) -> i32 {
46+
self.batch_size
47+
}
48+
49+
pub fn batch_size(&mut self, batch_size: i32) {
50+
self.batch_size = batch_size;
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
pub struct TransientStorePool;

rocketmq-store/src/config.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
mod broker_role;
19-
mod flush_disk_type;
18+
pub mod broker_role;
19+
pub mod flush_disk_type;
2020
pub mod message_store_config;
2121
pub(crate) mod store_path_config_helper;

rocketmq-store/src/consume_queue/mapped_file_queue.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ mod tests {
118118
#[test]
119119
fn test_load_with_files() {
120120
let temp_dir = tempfile::tempdir().unwrap();
121-
let file1_path = temp_dir.path().join("file1.txt");
122-
let file2_path = temp_dir.path().join("file2.txt");
121+
let file1_path = temp_dir.path().join("1111");
122+
let file2_path = temp_dir.path().join("2222");
123123
fs::File::create(&file1_path).unwrap();
124124
fs::File::create(&file2_path).unwrap();
125125

@@ -138,7 +138,7 @@ mod tests {
138138
#[test]
139139
fn test_load_with_empty_file() {
140140
let temp_dir = tempfile::tempdir().unwrap();
141-
let file_path = temp_dir.path().join("empty_file.txt");
141+
let file_path = temp_dir.path().join("1111");
142142
fs::File::create(&file_path).unwrap();
143143

144144
let mut queue = MappedFileQueue {
@@ -174,7 +174,7 @@ mod tests {
174174
#[test]
175175
fn test_load_with_correct_file() {
176176
let temp_dir = tempfile::tempdir().unwrap();
177-
let file_path = temp_dir.path().join("correct_file.txt");
177+
let file_path = temp_dir.path().join("1111");
178178
fs::write(&file_path, vec![0u8; 1024]).unwrap();
179179

180180
let mut queue = MappedFileQueue {

0 commit comments

Comments
 (0)