Skip to content

[ISSUE #366]🚀Implement ReputMessageService #367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions rocketmq-store/src/base/select_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
* limitations under the License.
*/

use std::sync::Arc;

use crate::log_file::mapped_file::default_impl::DefaultMappedFile;

/// Represents the result of selecting a mapped buffer.
pub struct SelectMappedBufferResult {
/// The start offset.
pub start_offset: i64,
pub start_offset: u64,
/// The ByteBuffer.
pub byte_buffer: Vec<u8>, // Using Vec<u8> as a simplified representation of ByteBuffer in Rust
// pub byte_buffer: Vec<u8>, // Using Vec<u8> as a simplified representation of ByteBuffer in
// Rust
/// The size.
pub size: i32,
/// The mapped file.
//pub mapped_file: Option<&'a dyn MappedFile>,
/// Indicates whether the buffer is in the cache.
pub mapped_file: Option<Arc<DefaultMappedFile>>,
/// Whether the buffer is in cache.
pub is_in_cache: bool,
}
51 changes: 51 additions & 0 deletions rocketmq-store/src/consume_queue/mapped_file_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@
self.mapped_files.last().cloned()
}

pub fn get_first_mapped_file(&self) -> Option<Arc<DefaultMappedFile>> {
if self.mapped_files.is_empty() {
return None;

Check warning on line 148 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L146-L148

Added lines #L146 - L148 were not covered by tests
}
self.mapped_files.first().cloned()
}

Check warning on line 151 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L150-L151

Added lines #L150 - L151 were not covered by tests

pub fn get_last_mapped_file_mut_start_offset(
&mut self,
start_offset: u64,
Expand Down Expand Up @@ -273,6 +280,50 @@
let _ = fs::remove_dir_all(path);
}
}

pub fn find_mapped_file_by_offset(

Check warning on line 284 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L284

Added line #L284 was not covered by tests
&self,
offset: i64,
return_first_on_not_found: bool,
) -> Option<Arc<DefaultMappedFile>> {
let first_mapped_file = self.get_first_mapped_file();
let last_mapped_file = self.get_last_mapped_file();
if first_mapped_file.is_some() && last_mapped_file.is_some() {
if offset < first_mapped_file.as_ref().unwrap().get_file_from_offset() as i64
|| offset
>= last_mapped_file.as_ref().unwrap().get_file_from_offset() as i64
+ self.mapped_file_size as i64

Check warning on line 295 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L289-L295

Added lines #L289 - L295 were not covered by tests
{
None

Check warning on line 297 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L297

Added line #L297 was not covered by tests
} else {
let index = offset as usize / self.mapped_file_size as usize
- first_mapped_file.as_ref().unwrap().get_file_from_offset() as usize
/ self.mapped_file_size as usize;
let target_file = self.mapped_files.get(index).cloned();
if target_file.is_some()
&& offset >= target_file.as_ref().unwrap().get_file_from_offset() as i64

Check warning on line 304 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L299-L304

Added lines #L299 - L304 were not covered by tests
{
return target_file;

Check warning on line 306 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L306

Added line #L306 was not covered by tests
}
for index in 0..self.mapped_files.len() {
let mapped_file = self.mapped_files.get(index).unwrap();
if offset >= mapped_file.get_file_from_offset() as i64
&& offset
< mapped_file.get_file_from_offset() as i64
+ self.mapped_file_size as i64

Check warning on line 313 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L308-L313

Added lines #L308 - L313 were not covered by tests
{
return Some(mapped_file.clone());

Check warning on line 315 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L315

Added line #L315 was not covered by tests
}
}
if return_first_on_not_found {
return first_mapped_file;

Check warning on line 319 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L318-L319

Added lines #L318 - L319 were not covered by tests
}
None
}

Check warning on line 322 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L321-L322

Added lines #L321 - L322 were not covered by tests
} else {
None

Check warning on line 324 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L324

Added line #L324 was not covered by tests
}
}

Check warning on line 326 in rocketmq-store/src/consume_queue/mapped_file_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/mapped_file_queue.rs#L326

Added line #L326 was not covered by tests
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-store/src/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,6 @@ pub trait RocketMQMessageStore: Clone {
fn get_state_machine_version(&self) -> i64;

async fn put_message(&mut self, msg: MessageExtBrokerInner) -> PutMessageResult;

fn truncate_files(&mut self, offset_to_truncate: i64) -> bool;
}
45 changes: 42 additions & 3 deletions rocketmq-store/src/log_file/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
message_result::PutMessageResult,
message_status_enum::{AppendMessageStatus, PutMessageStatus},
put_message_context::PutMessageContext,
select_result::SelectMappedBufferResult,
store_checkpoint::StoreCheckpoint,
swappable::Swappable,
},
Expand Down Expand Up @@ -360,7 +361,7 @@
} else {
warn!(
"The commitlog files are deleted, and delete the consume queue
files"
files"
);
self.mapped_file_queue.set_flushed_where(0);
self.mapped_file_queue.set_committed_where(0);
Expand Down Expand Up @@ -539,7 +540,7 @@
} else {
warn!(
"The commitlog files are deleted, and delete the consume queue
files"
files"
);
self.mapped_file_queue.set_flushed_where(0);
self.mapped_file_queue.set_committed_where(0);
Expand All @@ -551,6 +552,44 @@
pub fn get_max_offset(&self) -> i64 {
self.mapped_file_queue.get_max_offset()
}

pub fn get_min_offset(&self) -> i64 {
match self.mapped_file_queue.get_first_mapped_file() {
None => -1,
Some(mapped_file) => {
if mapped_file.is_available() {
mapped_file.get_file_from_offset() as i64

Check warning on line 561 in rocketmq-store/src/log_file/commit_log.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/commit_log.rs#L556-L561

Added lines #L556 - L561 were not covered by tests
} else {
self.roll_next_file(mapped_file.get_file_from_offset() as i64)

Check warning on line 563 in rocketmq-store/src/log_file/commit_log.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/commit_log.rs#L563

Added line #L563 was not covered by tests
}
}

Check warning on line 565 in rocketmq-store/src/log_file/commit_log.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/commit_log.rs#L565

Added line #L565 was not covered by tests
}
}

Check warning on line 567 in rocketmq-store/src/log_file/commit_log.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/commit_log.rs#L567

Added line #L567 was not covered by tests

pub fn roll_next_file(&self, offset: i64) -> i64 {
let mapped_file_size = self.message_store_config.mapped_file_size_commit_log as i64;
offset + mapped_file_size - (offset % mapped_file_size)
}

Check warning on line 572 in rocketmq-store/src/log_file/commit_log.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/commit_log.rs#L569-L572

Added lines #L569 - L572 were not covered by tests

pub fn get_data(&self, offset: i64) -> Option<SelectMappedBufferResult> {
self.get_data_with_option(offset, offset == 0)
}
pub fn get_data_with_option(

Check warning on line 577 in rocketmq-store/src/log_file/commit_log.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/commit_log.rs#L574-L577

Added lines #L574 - L577 were not covered by tests
&self,
offset: i64,
return_first_on_not_found: bool,
) -> Option<SelectMappedBufferResult> {
let mapped_file_size = self.message_store_config.mapped_file_size_commit_log as i64;
let mapped_file = self

Check warning on line 583 in rocketmq-store/src/log_file/commit_log.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/commit_log.rs#L582-L583

Added lines #L582 - L583 were not covered by tests
.mapped_file_queue
.find_mapped_file_by_offset(offset, return_first_on_not_found);
if let Some(mapped_file) = mapped_file {
let pos = (offset % mapped_file_size) as i32;
DefaultMappedFile::select_mapped_buffer(mapped_file, pos)
} else {
None

Check warning on line 590 in rocketmq-store/src/log_file/commit_log.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/commit_log.rs#L586-L590

Added lines #L586 - L590 were not covered by tests
}
}

Check warning on line 592 in rocketmq-store/src/log_file/commit_log.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/commit_log.rs#L592

Added line #L592 was not covered by tests
}

fn generate_key(msg: &MessageExtBrokerInner) -> String {
Expand All @@ -561,7 +600,7 @@
topic_queue_key
}

fn check_message_and_return_size(
pub fn check_message_and_return_size(

Check warning on line 603 in rocketmq-store/src/log_file/commit_log.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/commit_log.rs#L603

Added line #L603 was not covered by tests
bytes: &mut Bytes,
check_crc: bool,
check_dup_info: bool,
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-store/src/log_file/mapped_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

use std::{fs::File, io};
use std::{fs::File, io, sync::Arc};

use bytes::Bytes;
use rocketmq_common::common::message::{
Expand Down Expand Up @@ -103,7 +103,7 @@ pub trait MappedFile {

/// Selects a slice of the mapped byte buffer's sub-region behind the mapped file, starting at
/// the given position.
fn select_mapped_buffer(&self, pos: usize) -> SelectMappedBufferResult;
fn select_mapped_buffer(self: Arc<Self>, pos: i32) -> Option<SelectMappedBufferResult>;

/// Returns the mapped byte buffer behind the mapped file.
fn get_mapped_byte_buffer(&self) -> bytes::Bytes;
Expand Down
26 changes: 19 additions & 7 deletions rocketmq-store/src/log_file/mapped_file/default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
io::Write,
path::PathBuf,
ptr,
sync::atomic::{AtomicBool, AtomicI32, AtomicI64, Ordering},
sync::{
atomic::{AtomicBool, AtomicI32, AtomicI64, Ordering},
Arc,
},
};

use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Bytes, BytesMut};
use memmap2::MmapMut;
use rocketmq_common::common::message::{
message_batch::MessageExtBatch, message_single::MessageExtBrokerInner,
Expand Down Expand Up @@ -212,7 +215,7 @@
}

fn is_available(&self) -> bool {
todo!()
self.reference_resource.available.load(Ordering::Relaxed)

Check warning on line 218 in rocketmq-store/src/log_file/mapped_file/default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/mapped_file/default_impl.rs#L218

Added line #L218 was not covered by tests
}

fn append_message<AMC: AppendMessageCallback>(
Expand Down Expand Up @@ -324,8 +327,18 @@
todo!()
}

fn select_mapped_buffer(&self, pos: usize) -> SelectMappedBufferResult {
todo!()
fn select_mapped_buffer(self: Arc<Self>, pos: i32) -> Option<SelectMappedBufferResult> {
let read_position = self.get_read_position();
if pos < read_position && read_position > 0 && self.hold() {
Some(SelectMappedBufferResult {
start_offset: self.get_file_from_offset() + pos as u64,
size: read_position - pos,
mapped_file: Some(self),

Check warning on line 336 in rocketmq-store/src/log_file/mapped_file/default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/mapped_file/default_impl.rs#L330-L336

Added lines #L330 - L336 were not covered by tests
is_in_cache: false,
})
} else {
None

Check warning on line 340 in rocketmq-store/src/log_file/mapped_file/default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/mapped_file/default_impl.rs#L340

Added line #L340 was not covered by tests
}
}

fn get_mapped_byte_buffer(&self) -> Bytes {
Expand All @@ -349,8 +362,7 @@
let read_end_position = pos + size;
if read_end_position <= read_position as usize {
if self.hold() {
let mut buffer = BytesMut::with_capacity(size);
buffer.put(&self.mmapped_file.lock()[pos..read_end_position]);
let buffer = BytesMut::from(&self.mmapped_file.lock()[pos..read_end_position]);

Check warning on line 365 in rocketmq-store/src/log_file/mapped_file/default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/mapped_file/default_impl.rs#L365

Added line #L365 was not covered by tests
Some(buffer.freeze())
} else {
debug!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
fs::{File, OpenOptions},
io::Write,
path::PathBuf,
sync::atomic::{AtomicI32, Ordering},
sync::{
atomic::{AtomicI32, Ordering},
Arc,
},
};

use bytes::{Bytes, BytesMut};
Expand Down Expand Up @@ -281,7 +284,7 @@
todo!()
}

fn select_mapped_buffer(&self, pos: usize) -> SelectMappedBufferResult {
fn select_mapped_buffer(self: Arc<Self>, pos: i32) -> Option<SelectMappedBufferResult> {

Check warning on line 287 in rocketmq-store/src/log_file/mapped_file/default_impl_refactor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/mapped_file/default_impl_refactor.rs#L287

Added line #L287 was not covered by tests
todo!()
}

Expand Down
Loading
Loading