Skip to content

[ISSUE #1221]Optimize MappedFile method #1222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rocketmq-cli/src/content_show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::path::PathBuf;

use bytes::Buf;
use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_decoder;
use rocketmq_store::log_file::mapped_file::default_mapped_file_impl::DefaultMappedFile;
use rocketmq_store::log_file::mapped_file::MappedFile;
Expand All @@ -34,7 +35,7 @@
let file_metadata = fs::metadata(path_buf.clone()).unwrap();
println!("file size: {}B", file_metadata.len());
let mapped_file = DefaultMappedFile::new(
path_buf.to_os_string().to_string_lossy().to_string(),
CheetahString::from(path_buf.to_string_lossy().to_string()),

Check warning on line 38 in rocketmq-cli/src/content_show.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-cli/src/content_show.rs#L38

Added line #L38 was not covered by tests
file_metadata.len(),
);
// read message number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ mod tests {
let wrapper = TopicConfigAndMappingSerializeWrapper::default();
assert!(wrapper.topic_queue_mapping_info_map.is_empty());
assert!(wrapper.topic_queue_mapping_detail_map.is_empty());
assert_eq!(wrapper.mapping_data_version, DataVersion::new());
//assert_eq!(wrapper.mapping_data_version, DataVersion::new());
assert_eq!(
wrapper
.topic_config_serialize_wrapper()
Expand Down
1 change: 0 additions & 1 deletion rocketmq-store/src/base/append_message_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ impl DefaultAppendMessageCallback {
}
}

#[allow(unused_variables)]
impl AppendMessageCallback for DefaultAppendMessageCallback {
fn do_append<MF: MappedFile>(
&self,
Expand Down
9 changes: 6 additions & 3 deletions rocketmq-store/src/consume_queue/mapped_file_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;

use cheetah_string::CheetahString;
use log::warn;
use parking_lot::RwLock;
use rocketmq_common::UtilAll::offset_to_file_name;
Expand Down Expand Up @@ -142,8 +143,10 @@
return false;
}

let mapped_file =
DefaultMappedFile::new(file.to_string_lossy().to_string(), self.mapped_file_size);
let mapped_file = DefaultMappedFile::new(
CheetahString::from_string(file.to_string_lossy().to_string()),
self.mapped_file_size,
);
// Set wrote, flushed, committed positions for mapped_file
mapped_file.set_wrote_position(self.mapped_file_size as i32);
mapped_file.set_flushed_position(self.mapped_file_size as i32);
Expand Down Expand Up @@ -210,7 +213,7 @@
) -> Option<Arc<DefaultMappedFile>> {
let mut mapped_file = match self.allocate_mapped_file_service {
None => DefaultMappedFile::new(
next_file_path.to_string_lossy().to_string(),
CheetahString::from_string(next_file_path.to_string_lossy().to_string()),

Check warning on line 216 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L216

Added line #L216 was not covered by tests
self.mapped_file_size,
),
Some(ref _value) => {
Expand Down
5 changes: 3 additions & 2 deletions rocketmq-store/src/index/index_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use bytes::Buf;
use bytes::Bytes;
use cheetah_string::CheetahString;
use rocketmq_common::common::hasher::string_hasher::JavaStringHasher;

use crate::index::index_header::IndexHeader;
Expand Down Expand Up @@ -69,7 +70,7 @@
let file_total_size =
INDEX_HEADER_SIZE + (hash_slot_num * HASH_SLOT_SIZE) + (index_num * INDEX_SIZE);
let mapped_file = Arc::new(DefaultMappedFile::new(
file_name.to_string(),
CheetahString::from_slice(file_name),

Check warning on line 73 in rocketmq-store/src/index/index_file.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/index/index_file.rs#L73

Added line #L73 was not covered by tests
file_total_size as u64,
));

Expand All @@ -94,7 +95,7 @@
index_file
}

pub fn get_file_name(&self) -> String {
pub fn get_file_name(&self) -> &CheetahString {

Check warning on line 98 in rocketmq-store/src/index/index_file.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/index/index_file.rs#L98

Added line #L98 was not covered by tests
self.mapped_file.get_file_name()
}

Expand Down
3 changes: 2 additions & 1 deletion rocketmq-store/src/log_file/mapped_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::io;
use std::sync::Arc;

use bytes::Bytes;
use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_batch::MessageExtBatch;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;

Expand All @@ -37,7 +38,7 @@ pub trait MappedFile {
///
/// # Returns
/// A `String` representing the name of the file.
fn get_file_name(&self) -> String;
fn get_file_name(&self) -> &CheetahString;

/// Renames the mapped file to the specified file name.
///
Expand Down
25 changes: 13 additions & 12 deletions rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

use bytes::Bytes;
use bytes::BytesMut;
use cheetah_string::CheetahString;
use memmap2::MmapMut;
use rocketmq_common::common::message::message_batch::MessageExtBatch;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
Expand Down Expand Up @@ -58,7 +59,7 @@
file: File,
mmapped_file: SyncUnsafeCellWrapper<MmapMut>,
transient_store_pool: Option<TransientStorePool>,
file_name: String,
file_name: CheetahString,
file_from_offset: u64,
mapped_byte_buffer: Option<bytes::Bytes>,
wrote_position: AtomicI32,
Expand Down Expand Up @@ -94,14 +95,14 @@

impl Default for DefaultMappedFile {
fn default() -> Self {
Self::new(String::new(), 0)
Self::new(CheetahString::new(), 0)

Check warning on line 98 in rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs#L98

Added line #L98 was not covered by tests
}
}

impl DefaultMappedFile {
pub fn new(file_name: String, file_size: u64) -> Self {
pub fn new(file_name: CheetahString, file_size: u64) -> Self {
let file_from_offset = Self::get_file_from_offset(&file_name);
let path_buf = PathBuf::from(file_name.clone());
let path_buf = PathBuf::from(file_name.as_str());
Comment on lines +103 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider improving error handling in path conversions.

While the conversion from CheetahString to path is functionally correct, the code uses multiple unwrap() calls which could panic. Consider using proper error handling:

-    fn get_file_from_offset(file_name: &CheetahString) -> u64 {
-        let file_from_offset = PathBuf::from(file_name.as_str())
-            .file_name()
-            .unwrap()
-            .to_str()
-            .unwrap()
-            .parse::<u64>()
-            .unwrap();
-        file_from_offset
+    fn get_file_from_offset(file_name: &CheetahString) -> Result<u64, std::io::Error> {
+        let path = PathBuf::from(file_name.as_str());
+        let file_name = path.file_name()
+            .ok_or_else(|| std::io::Error::new(
+                std::io::ErrorKind::InvalidInput,
+                "Invalid file name"
+            ))?;
+        let file_name_str = file_name.to_str()
+            .ok_or_else(|| std::io::Error::new(
+                std::io::ErrorKind::InvalidInput,
+                "Invalid UTF-8 in file name"
+            ))?;
+        file_name_str.parse::<u64>()
+            .map_err(|e| std::io::Error::new(
+                std::io::ErrorKind::InvalidInput,
+                format!("Invalid offset in file name: {}", e)
+            ))
+    }

Also applies to: 144-156

ensure_dir_ok(path_buf.parent().unwrap().to_str().unwrap());
let file = OpenOptions::new()
.read(true)
Expand Down Expand Up @@ -140,8 +141,8 @@
}
}

fn get_file_from_offset(file_name: &String) -> u64 {
let file_from_offset = PathBuf::from(file_name.to_owned())
fn get_file_from_offset(file_name: &CheetahString) -> u64 {
let file_from_offset = PathBuf::from(file_name.as_str())
.file_name()
.unwrap()
.to_str()
Expand All @@ -151,8 +152,8 @@
file_from_offset
}

fn build_file(file_name: &String, file_size: u64) -> File {
let path = PathBuf::from(file_name.clone());
fn build_file(file_name: &CheetahString, file_size: u64) -> File {
let path = PathBuf::from(file_name.as_str());

Check warning on line 156 in rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs#L155-L156

Added lines #L155 - L156 were not covered by tests
let file = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -166,12 +167,12 @@
}

pub fn new_with_transient_store_pool(
file_name: String,
file_name: CheetahString,

Check warning on line 170 in rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs#L170

Added line #L170 was not covered by tests
file_size: u64,
transient_store_pool: TransientStorePool,
) -> Self {
let file_from_offset = Self::get_file_from_offset(&file_name);
let path_buf = PathBuf::from(file_name.clone());
let path_buf = PathBuf::from(file_name.as_str());

Check warning on line 175 in rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs#L175

Added line #L175 was not covered by tests
let file = OpenOptions::new()
.read(true)
.write(true)
Expand Down Expand Up @@ -212,8 +213,8 @@

#[allow(unused_variables)]
impl MappedFile for DefaultMappedFile {
fn get_file_name(&self) -> String {
self.file_name.clone()
fn get_file_name(&self) -> &CheetahString {
&self.file_name

Check warning on line 217 in rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs#L216-L217

Added lines #L216 - L217 were not covered by tests
}

fn rename_to(&mut self, file_name: &str) -> bool {
Expand Down
Loading