Skip to content

[ISSUE #242]🚀Implement DefaultMappedFile related functions-1 #243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions rocketmq-broker/src/broker_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ pub struct BrokerController {
pub(crate) polling_info_processor: PollingInfoProcessor,
pub(crate) ack_message_processor: AckMessageProcessor,
pub(crate) change_invisible_time_processor: ChangeInvisibleTimeProcessor,
pub(crate) send_message_processor: SendMessageProcessor,
#[cfg(feature = "local_file_store")]
pub(crate) send_message_processor: SendMessageProcessor<LocalFileMessageStore>,
pub(crate) reply_message_processor: ReplyMessageProcessor,
pub(crate) notify_message_arriving_listener: NotifyMessageArrivingListener,
pub(crate) default_consumer_ids_change_listener: DefaultConsumerIdsChangeListener,
Expand Down Expand Up @@ -257,7 +258,8 @@ impl BrokerController {

fn register_processor(&mut self) {
let broker_server = self.broker_server.as_mut().unwrap();
let send_message_processor = Arc::new(SendMessageProcessor::default());
let send_message_processor =
Arc::new(SendMessageProcessor::<LocalFileMessageStore>::default());
broker_server.register_processor(RequestCode::SendMessage, send_message_processor.clone());
broker_server
.register_processor(RequestCode::SendMessageV2, send_message_processor.clone());
Expand Down
26 changes: 11 additions & 15 deletions rocketmq-broker/src/processor/send_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ use rocketmq_remoting::{
},
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};
use rocketmq_store::{
base::message_result::PutMessageResult, log_file::MessageStore,
message_store::local_file_store::LocalFileMessageStore,
};
use rocketmq_store::{base::message_result::PutMessageResult, log_file::MessageStore};

use crate::{
broker_config::BrokerConfig,
Expand All @@ -53,31 +50,31 @@ use crate::{
},
};

pub struct SendMessageProcessor {
pub struct SendMessageProcessor<MS> {
inner: SendMessageProcessorInner,
topic_queue_mapping_manager: Arc<parking_lot::RwLock<TopicQueueMappingManager>>,
topic_config_manager: Arc<parking_lot::RwLock<TopicConfigManager>>,
broker_config: Arc<parking_lot::RwLock<BrokerConfig>>,
#[cfg(feature = "local_file_store")]
message_store: Arc<parking_lot::RwLock<LocalFileMessageStore>>,
message_store: Arc<parking_lot::RwLock<MS>>,
}

impl Default for SendMessageProcessor {
impl<MS> Default for SendMessageProcessor<MS> {
fn default() -> Self {
#[cfg(feature = "local_file_store")]
/* #[cfg(feature = "local_file_store")]
Self {
inner: SendMessageProcessorInner::default(),
topic_queue_mapping_manager: Arc::new(parking_lot::RwLock::new(
TopicQueueMappingManager::default(),
)),
topic_config_manager: Arc::new(parking_lot::RwLock::new(TopicConfigManager::default())),
broker_config: Arc::new(parking_lot::RwLock::new(BrokerConfig::default())),
message_store: Arc::new(parking_lot::RwLock::new(LocalFileMessageStore::default())),
}
message_store: Arc::new(parking_lot::RwLock::new(Default::default())),
}*/
unimplemented!()
}
}

impl RequestProcessor for SendMessageProcessor {
impl<MS: MessageStore + Send> RequestProcessor for SendMessageProcessor<MS> {
fn process_request(
&mut self,
ctx: ConnectionHandlerContext,
Expand Down Expand Up @@ -129,11 +126,10 @@ impl RequestProcessor for SendMessageProcessor {
}

#[allow(unused_variables)]
impl SendMessageProcessor {
#[cfg(feature = "local_file_store")]
impl<MS: MessageStore + Send> SendMessageProcessor<MS> {
pub fn new(
topic_queue_mapping_manager: Arc<parking_lot::RwLock<TopicQueueMappingManager>>,
message_store: Arc<parking_lot::RwLock<LocalFileMessageStore>>,
message_store: Arc<parking_lot::RwLock<MS>>,
) -> Self {
Self {
inner: SendMessageProcessorInner::default(),
Expand Down
6 changes: 4 additions & 2 deletions rocketmq-store/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* limitations under the License.
*/

use memmap2::MmapMut;

pub mod append_message_callback;
pub mod compaction_append_msg_callback;
pub(crate) mod dispatch_request;
Expand All @@ -27,12 +29,12 @@ pub mod swappable;
pub mod transient_store_pool;

pub struct ByteBuffer<'a> {
data: &'a mut [u8],
data: &'a mut MmapMut,
position: i64,
}

impl<'a> ByteBuffer<'a> {
pub fn new(data: &'a mut [u8], position: i64) -> ByteBuffer<'a> {
pub fn new(data: &'a mut MmapMut, position: i64) -> ByteBuffer<'a> {
ByteBuffer { data, position }
}

Expand Down
127 changes: 72 additions & 55 deletions rocketmq-store/src/base/append_message_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::sync::Arc;

use rocketmq_common::common::message::{
message_batch::MessageExtBatch, message_single::MessageExtBrokerInner,
use std::{io::Write, mem, sync::Arc};

use bytes::{Buf, BufMut};
use rocketmq_common::{
common::{
message::{
message_batch::MessageExtBatch,
message_single::{MessageExt, MessageExtBrokerInner},
},
sys_flag::message_sys_flag::MessageSysFlag,
},
UtilAll,
};

use crate::{
base::{
message_result::AppendMessageResult, put_message_context::PutMessageContext, ByteBuffer,
},
config::message_store_config::MessageStoreConfig,
log_file::commit_log::CRC32_RESERVED_LEN,
log_file::commit_log::{CommitLog, BLANK_MAGIC_CODE, CRC32_RESERVED_LEN},
};

/// Write messages callback interface
Expand All @@ -44,11 +52,11 @@ pub trait AppendMessageCallback {
///
/// The number of bytes written
fn do_append(
&self,
&mut self,
file_from_offset: i64,
byte_buffer: &mut ByteBuffer,
max_blank: i32,
msg: &MessageExtBrokerInner,
msg: &mut MessageExtBrokerInner,
put_message_context: &PutMessageContext,
) -> AppendMessageResult;

Expand Down Expand Up @@ -99,115 +107,124 @@ impl DefaultAppendMessageCallback {
#[allow(unused_variables)]
impl AppendMessageCallback for DefaultAppendMessageCallback {
fn do_append(
&self,
&mut self,
file_from_offset: i64,
byte_buffer: &mut ByteBuffer,
max_blank: i32,
msg_inner: &MessageExtBrokerInner,
msg_inner: &mut MessageExtBrokerInner,
put_message_context: &PutMessageContext,
) -> AppendMessageResult {
/* let mut pre_encode_buffer = msg_inner.encoded_buff.clone();
let mut pre_encode_buffer = bytes::BytesMut::from(msg_inner.encoded_buff.as_ref());
let is_multi_dispatch_msg = self.message_store_config.enable_multi_dispatch
&& CommitLog::is_multi_dispatch_msg(&msg_inner);
&& CommitLog::is_multi_dispatch_msg(msg_inner);
if is_multi_dispatch_msg {
unimplemented!()
}
let msg_len = pre_encode_buffer.get_i32();
let size = std::mem::size_of::<i32>();
//can optimize?
let msg_len = pre_encode_buffer.clone().get_i32();
//physical offset
let wrote_offset = file_from_offset + byte_buffer.position;
pre_encode_buffer.advance(size);

let msg_id_supplier = || {
let sys_flag = msg_inner.get_sys_flag();
let sys_flag = msg_inner.sys_flag();
let msg_id_len = if (sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG) == 0 {
4 + 4 + 8
} else {
16 + 4 + 8
};

let mut msg_id_buffer = bytes::BytesMut::from(
MessageExt::socket_address_2_byte_buffer(&msg_inner.store_host()),
MessageExt::socket_address_2_byte_buffer(&msg_inner.store_host()).as_ref(),
);
msg_id_buffer.put_i64(wrote_offset);
UtilAll::bytes_to_string(msg_id_buffer.array())
UtilAll::bytes_to_string(msg_id_buffer.as_ref())
};

let queue_offset = msg_inner.queue_offset();
let mut queue_offset = msg_inner.queue_offset();

let message_num = get_message_num(&msg_inner);
let message_num = CommitLog::get_message_num(msg_inner);

let tran_type = MessageSysFlag::get_transaction_value(msg_inner.get_sys_flag());
let tran_type = MessageSysFlag::get_transaction_value(msg_inner.sys_flag());
match tran_type {
MessageSysFlag::TRANSACTION_PREPARED_TYPE
| MessageSysFlag::TRANSACTION_ROLLBACK_TYPE => {
queue_offset = Some(0);
queue_offset = 0;
}
MessageSysFlag::TRANSACTION_NOT_TYPE | MessageSysFlag::TRANSACTION_COMMIT_TYPE | _ => {}
_ => {}
}

if (msg_len + END_FILE_MIN_BLANK_LENGTH as i32) > max_blank {
msg_store_item_memory.clear();
msg_store_item_memory.put_int(max_blank);
msg_store_item_memory.put_int(CommitLog::BLANK_MAGIC_CODE);
byte_buffer.put(&msg_store_item_memory.array()[0..8]);
let begin_time_mills = CommitLog::default_message_store().now();
return AppendMessageResult::new(
AppendMessageStatus::END_OF_FILE,
if (msg_len + END_FILE_MIN_BLANK_LENGTH) > max_blank {
self.msg_store_item_memory.clear();
self.msg_store_item_memory.put_i32(max_blank);
self.msg_store_item_memory.put_i32(BLANK_MAGIC_CODE);
// byte_buffer.put(&self.msg_store_item_memory[0..8]);
/* let begin_time_mills = CommitLog::default_message_store().now(); */
/*return AppendMessageResult::new(
AppendMessageStatus::EndOfFile,
wrote_offset,
max_blank as usize,
msg_id_supplier,
msg_inner.get_store_timestamp(),
queue_offset,
CommitLog::default_message_store().now() - begin_time_mills,
None,
);
);*/
return AppendMessageResult::default();
}

let mut pos = 4 + 4 + 4 + 4 + 4;
pre_encode_buffer.put_long(pos, queue_offset.unwrap());
pre_encode_buffer[pos..(pos + mem::size_of::<i64>())]
.copy_from_slice(&queue_offset.to_be_bytes());
pos += 8;
pre_encode_buffer.put_long(pos, file_from_offset + byte_buffer.position() as i64);
let ip_len = if (msg_inner.get_sys_flag() & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {

pre_encode_buffer[pos..(pos + mem::size_of::<i64>())]
.copy_from_slice(&(file_from_offset + byte_buffer.get_position()).to_be_bytes());
let ip_len = if (msg_inner.sys_flag() & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
4 + 4
} else {
16 + 4
};
pos += 8 + 4 + 8 + ip_len;
pre_encode_buffer.put_long(pos, msg_inner.get_store_timestamp());
if enabled_append_prop_crc {
// refresh store time stamp in lock
pre_encode_buffer[pos..(pos + mem::size_of::<i64>())]
.copy_from_slice(msg_inner.store_timestamp().to_be_bytes().as_ref());
/* if enabled_append_prop_crc {
let check_size = msg_len - MessageDecoder::CRC32_RESERVED_LENGTH;
let mut tmp_buffer = pre_encode_buffer.duplicate();
tmp_buffer.limit(tmp_buffer.position() + check_size as usize);
let crc32 = MessageDecoder::crc32(&tmp_buffer);
tmp_buffer.limit(tmp_buffer.position() + MessageDecoder::CRC32_RESERVED_LENGTH);
MessageDecoder::create_crc32(&mut tmp_buffer, crc32);
}

let begin_time_mills = CommitLog::default_message_store().now();
CommitLog::default_message_store()
.perf_counter()
.start_tick("WRITE_MEMORY_TIME_MS");
byte_buffer.put(&pre_encode_buffer);
CommitLog::default_message_store()
.perf_counter()
.end_tick("WRITE_MEMORY_TIME_MS");
msg_inner.set_encoded_buff(None);

if is_multi_dispatch_msg {
}*/

//let begin_time_mills = CommitLog::default_message_store().now();
/* CommitLog::default_message_store()
.perf_counter()
.start_tick("WRITE_MEMORY_TIME_MS");*/
byte_buffer
.get_data_mut()
.write_all(&pre_encode_buffer)
.unwrap();
/* CommitLog::default_message_store()
.perf_counter()
.end_tick("WRITE_MEMORY_TIME_MS");*/
msg_inner.encoded_buff.clear();

/* if is_multi_dispatch_msg {
CommitLog::multi_dispatch().update_multi_queue_offset(&msg_inner);
}
}*/

AppendMessageResult::new(
/*AppendMessageResult::new(
AppendMessageStatus::PutOk,
wrote_offset,
msg_len as usize,
msg_id_supplier,
msg_inner.get_store_timestamp(),
msg_inner.store_timestamp(),
queue_offset,
CommitLog::default_message_store().now() - begin_time_mills,
0,
Some(message_num),
);*/
unimplemented!()
)*/
AppendMessageResult::default()
}

fn do_append_batch(
Expand Down
22 changes: 15 additions & 7 deletions rocketmq-store/src/base/message_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
* limitations under the License.
*/

use std::{cell::RefCell, rc::Rc};

use crate::base::{
message_status_enum::{AppendMessageStatus, GetMessageStatus, PutMessageStatus},
select_result::SelectMappedBufferResult,
};

/// Represents the result of an append message operation.
#[derive(Default)]
pub struct AppendMessageResult {
/// Return code.
pub status: AppendMessageStatus,
Expand All @@ -34,7 +31,7 @@ pub struct AppendMessageResult {
/// Message ID.
pub msg_id: String,
/// Message ID supplier.
pub msg_id_supplier: Option<Rc<RefCell<dyn Fn() -> String>>>,
pub msg_id_supplier: &'static dyn Fn() -> String,
/// Message storage timestamp.
pub store_timestamp: i64,
/// Consume queue's offset (step by one).
Expand All @@ -45,13 +42,24 @@ pub struct AppendMessageResult {
pub msg_num: i32,
}

#[derive(Default)]
pub struct PutMessageResult {
put_message_status: PutMessageStatus,
append_message_result: Option<AppendMessageResult>,
remote_put: bool,
}

impl Default for AppendMessageResult {
fn default() -> Self {
unimplemented!()
}
}

impl Default for PutMessageResult {
fn default() -> Self {
unimplemented!()
}
}

impl PutMessageResult {
pub fn new(
put_message_status: PutMessageStatus,
Expand All @@ -67,9 +75,9 @@ impl PutMessageResult {
}

/// Represents the result of getting a message.
pub struct GetMessageResult {
pub struct GetMessageResult<'a> {
/// The list of mapped buffer results.
pub message_mapped_list: Vec<SelectMappedBufferResult>,
pub message_mapped_list: Vec<SelectMappedBufferResult<'a>>,
/// The list of message buffers.
pub message_buffer_list: Vec<Vec<u8>>, /* Using Vec<u8> as a simplified representation of
* ByteBuffer in Rust */
Expand Down
Loading