Skip to content

Commit 5bdcee0

Browse files
committed
[ISSUE #375]💥Implement IndexService💥
1 parent 9a62e45 commit 5bdcee0

File tree

8 files changed

+417
-18
lines changed

8 files changed

+417
-18
lines changed

rocketmq-common/src/utils/file_utils.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ use std::{
2121
path::{Path, PathBuf},
2222
};
2323

24+
use parking_lot::Mutex;
2425
use tracing::warn;
2526

27+
static LOCK: Mutex<()> = Mutex::new(());
28+
2629
pub fn file_to_string(file_name: &str) -> Result<String, io::Error> {
2730
if !PathBuf::from(file_name).exists() {
2831
warn!("file not exist:{}", file_name);
@@ -47,6 +50,8 @@ pub fn file_to_string_impl(file: &File) -> Result<String, io::Error> {
4750
}
4851

4952
pub fn string_to_file(str_content: &str, file_name: &str) -> io::Result<()> {
53+
let lock = LOCK.lock();
54+
5055
let bak_file = format!("{}.bak", file_name);
5156

5257
// Read previous content and create a backup
@@ -56,7 +61,7 @@ pub fn string_to_file(str_content: &str, file_name: &str) -> io::Result<()> {
5661

5762
// Write new content to the file
5863
string_to_file_not_safe(str_content, file_name)?;
59-
64+
drop(lock);
6065
Ok(())
6166
}
6267

rocketmq-store/src/config/message_store_config.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ pub struct MessageStoreConfig {
105105
pub max_transfer_count_on_message_in_disk: usize,
106106
pub access_message_in_memory_max_ratio: usize,
107107
pub message_index_enable: bool,
108-
pub max_hash_slot_num: usize,
109-
pub max_index_num: usize,
108+
pub max_hash_slot_num: u32,
109+
pub max_index_num: u32,
110110
pub max_msgs_num_batch: usize,
111111
pub message_index_safe: bool,
112112
pub ha_listen_port: usize,
@@ -287,10 +287,10 @@ impl Default for MessageStoreConfig {
287287
max_transfer_bytes_on_message_in_disk: 0,
288288
max_transfer_count_on_message_in_disk: 0,
289289
access_message_in_memory_max_ratio: 0,
290-
message_index_enable: false,
291-
max_hash_slot_num: 0,
292-
max_index_num: 0,
293-
max_msgs_num_batch: 0,
290+
message_index_enable: true,
291+
max_hash_slot_num: 5000000,
292+
max_index_num: 5000000 * 4,
293+
max_msgs_num_batch: 64,
294294
message_index_safe: false,
295295
ha_listen_port: 0,
296296
ha_send_heartbeat_interval: 0,

rocketmq-store/src/index.rs

+1
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ pub mod index_dispatch;
1919
pub mod index_file;
2020
pub mod index_header;
2121
pub mod index_service;
22+
pub mod query_offset_result;

rocketmq-store/src/index/index_file.rs

+20
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,20 @@ use crate::{
2626
};
2727

2828
const HASH_SLOT_SIZE: usize = 4;
29+
30+
/**
31+
* Each index's store unit. Format:
32+
* <pre>
33+
* ┌───────────────┬───────────────────────────────┬───────────────┬───────────────┐
34+
* │ Key HashCode │ Physical Offset │ Time Diff │ Next Index Pos│
35+
* │ (4 Bytes) │ (8 Bytes) │ (4 Bytes) │ (4 Bytes) │
36+
* ├───────────────┴───────────────────────────────┴───────────────┴───────────────┤
37+
* │ Index Store Unit │
38+
* │ │
39+
* </pre>
40+
* Each index's store unit. Size:
41+
* Key HashCode(4) + Physical Offset(8) + Time Diff(4) + Next Index Pos(4) = 20 Bytes
42+
*/
2943
const INDEX_SIZE: usize = 20;
3044
const INVALID_INDEX: i32 = 0;
3145

@@ -37,6 +51,12 @@ pub struct IndexFile {
3751
index_header: IndexHeader,
3852
}
3953

54+
impl PartialEq for IndexFile {
55+
fn eq(&self, other: &Self) -> bool {
56+
std::ptr::eq(self as *const IndexFile, other as *const IndexFile)
57+
}
58+
}
59+
4060
impl IndexFile {
4161
pub fn new(
4262
file_name: &str,

rocketmq-store/src/index/index_header.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl IndexHeader {
5858
}
5959

6060
pub fn load(&self) {
61-
let mut buffer = self.mapped_file.get_data(0, INDEX_HEADER_SIZE).unwrap();
61+
let mut buffer = self.mapped_file.get_bytes(0, INDEX_HEADER_SIZE).unwrap();
6262
self.begin_timestamp
6363
.store(buffer.get_i64(), Ordering::SeqCst);
6464
self.end_timestamp.store(buffer.get_i64(), Ordering::SeqCst);

0 commit comments

Comments
 (0)