Skip to content

Commit c9ec139

Browse files
committed
[ISSUE mxsm#512]🔥Implementing Functionality do append batch messages🚀
1 parent 3bc1636 commit c9ec139

File tree

8 files changed

+190
-10
lines changed

8 files changed

+190
-10
lines changed

rocketmq-common/src/utils/message_utils.rs

+84
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,57 @@ pub fn build_message_id(socket_addr: SocketAddr, wrote_offset: i64) -> String {
9090
bytes_to_string(msg_id_vec.as_slice())
9191
}
9292

93+
pub fn socket_address_to_vec(socket_addr: SocketAddr) -> Vec<u8> {
94+
match socket_addr {
95+
SocketAddr::V4(addr) => {
96+
let mut msg_id_vec = Vec::<u8>::with_capacity(8);
97+
msg_id_vec.extend_from_slice(&addr.ip().octets());
98+
msg_id_vec.put_i32(addr.port() as i32);
99+
msg_id_vec
100+
}
101+
SocketAddr::V6(addr) => {
102+
let mut msg_id_vec = Vec::<u8>::with_capacity(20);
103+
msg_id_vec.extend_from_slice(&addr.ip().octets());
104+
msg_id_vec.put_i32(addr.port() as i32);
105+
msg_id_vec
106+
}
107+
}
108+
}
109+
110+
pub fn build_batch_message_id(
111+
socket_addr: SocketAddr,
112+
store_host_length: i32,
113+
batch_size: usize,
114+
phy_pos: &[i64],
115+
) -> String {
116+
if batch_size == 0 {
117+
return String::new();
118+
}
119+
let msg_id_len = (store_host_length + 8) as usize;
120+
let mut msg_id_vec = vec![0u8; msg_id_len];
121+
msg_id_vec[..store_host_length as usize].copy_from_slice(&socket_address_to_vec(socket_addr));
122+
let mut message_id = String::with_capacity(batch_size * msg_id_len * 2 + batch_size - 1);
123+
for (index, value) in phy_pos.iter().enumerate() {
124+
msg_id_vec[msg_id_len - 8..msg_id_len].copy_from_slice(&value.to_be_bytes());
125+
if index != 0 {
126+
message_id.push(',');
127+
}
128+
message_id.push_str(&bytes_to_string(&msg_id_vec));
129+
}
130+
message_id
131+
}
132+
93133
pub fn parse_message_id(_msg_id: impl Into<String>) -> (SocketAddr, i64) {
94134
unimplemented!()
95135
}
96136

97137
#[cfg(test)]
98138
mod tests {
139+
use std::net::Ipv4Addr;
140+
141+
use bytes::Bytes;
142+
use bytes::BytesMut;
143+
99144
use super::*;
100145
use crate::common::message::message_single::MessageExt;
101146

@@ -154,4 +199,43 @@ mod tests {
154199
let result = build_message_id(socket_addr, wrote_offset);
155200
assert_eq!(result, "7F0000010000000C0000000000000001");
156201
}
202+
203+
#[test]
204+
fn build_batch_message_id_creates_correct_id_for_single_position() {
205+
let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
206+
let store_host_length = 8;
207+
let batch_size = 1;
208+
let phy_pos = vec![12345];
209+
210+
let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
211+
212+
assert_eq!(result, "7F00000100001F900000000000003039");
213+
}
214+
215+
#[test]
216+
fn build_batch_message_id_creates_correct_id_for_multiple_positions() {
217+
let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
218+
let store_host_length = 8;
219+
let batch_size = 2;
220+
let phy_pos = vec![12345, 67890];
221+
222+
let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
223+
224+
assert_eq!(
225+
result,
226+
"7F00000100001F900000000000003039,7F00000100001F900000000000010932"
227+
);
228+
}
229+
230+
#[test]
231+
fn build_batch_message_id_creates_empty_id_for_no_positions() {
232+
let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
233+
let store_host_length = 8;
234+
let batch_size = 0;
235+
let phy_pos = vec![];
236+
237+
let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
238+
239+
assert_eq!(result, "");
240+
}
157241
}

rocketmq-store/src/base/append_message_callback.rs

+91-4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use rocketmq_common::common::message::message_batch::MessageExtBatch;
2424
use rocketmq_common::common::message::message_single::MessageExtBrokerInner;
2525
use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
2626
use rocketmq_common::utils::message_utils;
27+
use rocketmq_common::MessageUtils::build_batch_message_id;
28+
use rocketmq_common::TimeUtils::get_current_millis;
2729

2830
use crate::base::message_result::AppendMessageResult;
2931
use crate::base::message_status_enum::AppendMessageStatus;
@@ -65,7 +67,8 @@ pub trait AppendMessageCallback {
6567
mapped_file: &MF,
6668
max_blank: i32,
6769
msg: &mut MessageExtBatch,
68-
put_message_context: &PutMessageContext,
70+
put_message_context: &mut PutMessageContext,
71+
enabled_append_prop_crc: bool,
6972
) -> AppendMessageResult;
7073
}
7174

@@ -179,9 +182,93 @@ impl AppendMessageCallback for DefaultAppendMessageCallback {
179182
file_from_offset: i64,
180183
mapped_file: &MF,
181184
max_blank: i32,
182-
msg: &mut MessageExtBatch,
183-
put_message_context: &PutMessageContext,
185+
msg_batch: &mut MessageExtBatch,
186+
put_message_context: &mut PutMessageContext,
187+
enabled_append_prop_crc: bool,
184188
) -> AppendMessageResult {
185-
todo!()
189+
let wrote_offset = file_from_offset + mapped_file.get_wrote_position() as i64;
190+
let queue_offset = msg_batch.message_ext_broker_inner.queue_offset();
191+
let begin_queue_offset = queue_offset;
192+
193+
let begin_time_mills = get_current_millis();
194+
195+
// Assuming get_encoded_buff returns Option<ByteBuffer>
196+
let mut pre_encode_buffer = msg_batch.encoded_buff.take().unwrap();
197+
let sys_flag = msg_batch.message_ext_broker_inner.sys_flag();
198+
let born_host_length = if sys_flag & MessageSysFlag::BORNHOST_V6_FLAG == 0 {
199+
4 + 4
200+
} else {
201+
16 + 4
202+
};
203+
let store_host_length = if sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG == 0 {
204+
4 + 4
205+
} else {
206+
16 + 4
207+
};
208+
let mut total_msg_len = 0;
209+
let mut msg_num = 0;
210+
let mut msg_pos = 0;
211+
let mut index = 0;
212+
while total_msg_len < pre_encode_buffer.len() as i32 {
213+
let msg_len = i32::from_be_bytes(
214+
pre_encode_buffer[total_msg_len as usize..(total_msg_len + 4) as usize]
215+
.try_into()
216+
.unwrap(),
217+
);
218+
total_msg_len += msg_len;
219+
if total_msg_len + END_FILE_MIN_BLANK_LENGTH > max_blank {
220+
let mut bytes = BytesMut::with_capacity(END_FILE_MIN_BLANK_LENGTH as usize);
221+
bytes.put_i32(max_blank);
222+
bytes.put_i32(BLANK_MAGIC_CODE);
223+
mapped_file.append_message_bytes(&bytes.freeze());
224+
return AppendMessageResult {
225+
status: AppendMessageStatus::EndOfFile,
226+
wrote_offset,
227+
wrote_bytes: max_blank,
228+
msg_id: "".to_string(),
229+
store_timestamp: msg_batch.message_ext_broker_inner.store_timestamp(),
230+
logics_offset: begin_queue_offset,
231+
..Default::default()
232+
};
233+
}
234+
let mut pos = msg_pos + 20;
235+
pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&queue_offset.to_be_bytes());
236+
pos += 8;
237+
let phy_pos = wrote_offset + total_msg_len as i64 - msg_len as i64;
238+
pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&phy_pos.to_be_bytes());
239+
pos += 8 + 4 + 8 + born_host_length;
240+
pre_encode_buffer[pos..(pos + 8)].copy_from_slice(
241+
&msg_batch
242+
.message_ext_broker_inner
243+
.store_timestamp()
244+
.to_be_bytes(),
245+
);
246+
if enabled_append_prop_crc {
247+
let _check_size = msg_len - self.crc32_reserved_length;
248+
}
249+
put_message_context.get_phy_pos_mut()[index] = phy_pos;
250+
msg_num += 1;
251+
msg_pos += msg_len as usize;
252+
index += 1;
253+
}
254+
255+
let bytes = pre_encode_buffer.freeze();
256+
mapped_file.append_message_bytes(&bytes);
257+
let msg_id = build_batch_message_id(
258+
msg_batch.message_ext_broker_inner.store_host(),
259+
store_host_length,
260+
put_message_context.get_batch_size() as usize,
261+
put_message_context.get_phy_pos(),
262+
);
263+
AppendMessageResult {
264+
status: AppendMessageStatus::PutOk,
265+
wrote_offset,
266+
wrote_bytes: total_msg_len,
267+
msg_id,
268+
store_timestamp: msg_batch.message_ext_broker_inner.store_timestamp(),
269+
logics_offset: begin_queue_offset,
270+
msg_num,
271+
..Default::default()
272+
}
186273
}
187274
}

rocketmq-store/src/base/put_message_context.rs

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ impl PutMessageContext {
4242
self.phy_pos = phy_pos;
4343
}
4444

45+
pub fn get_phy_pos_mut(&mut self) -> &mut [i64] {
46+
&mut self.phy_pos
47+
}
48+
4549
pub fn get_batch_size(&self) -> i32 {
4650
self.batch_size
4751
}

rocketmq-store/src/log_file/commit_log.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ impl CommitLog {
282282
.store_timestamp = get_current_millis() as i64;
283283
let tran_type =
284284
MessageSysFlag::get_transaction_value(msg_batch.message_ext_broker_inner.sys_flag());
285-
if MessageSysFlag::TRANSACTION_NOT_TYPE == tran_type {
285+
if MessageSysFlag::TRANSACTION_NOT_TYPE != tran_type {
286286
return PutMessageResult::new_default(PutMessageStatus::MessageIllegal);
287287
}
288288
if msg_batch
@@ -374,6 +374,7 @@ impl CommitLog {
374374
&mut msg_batch,
375375
self.append_message_callback.as_ref(),
376376
&mut put_message_context,
377+
self.enabled_append_prop_crc,
377378
);
378379
let put_message_result = match result.status {
379380
AppendMessageStatus::PutOk => {
@@ -403,6 +404,7 @@ impl CommitLog {
403404
&mut msg_batch,
404405
self.append_message_callback.as_ref(),
405406
&mut put_message_context,
407+
self.enabled_append_prop_crc,
406408
);
407409
if AppendMessageStatus::PutOk == result.status {
408410
PutMessageResult::new_append_result(PutMessageStatus::PutOk, Some(result))
@@ -865,7 +867,7 @@ impl CommitLog {
865867
} else {
866868
warn!(
867869
"The commitlog files are deleted, and delete the consume queue
868-
files"
870+
files"
869871
);
870872
self.mapped_file_queue.set_flushed_where(0);
871873
self.mapped_file_queue.set_committed_where(0);
@@ -1045,7 +1047,7 @@ impl CommitLog {
10451047
} else {
10461048
warn!(
10471049
"The commitlog files are deleted, and delete the consume queue
1048-
files"
1050+
files"
10491051
);
10501052
self.mapped_file_queue.set_flushed_where(0);
10511053
self.mapped_file_queue.set_committed_where(0);

rocketmq-store/src/log_file/mapped_file.rs

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ pub trait MappedFile {
6767
message: &mut MessageExtBatch,
6868
message_callback: &AMC,
6969
put_message_context: &mut PutMessageContext,
70+
enabled_append_prop_crc: bool,
7071
) -> AppendMessageResult;
7172

7273
fn append_message_compaction(

rocketmq-store/src/log_file/mapped_file/default_impl.rs

+2
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ impl MappedFile for DefaultMappedFile {
255255
message: &mut MessageExtBatch,
256256
message_callback: &AMC,
257257
put_message_context: &mut PutMessageContext,
258+
enabled_append_prop_crc: bool,
258259
) -> AppendMessageResult {
259260
let current_pos = self.wrote_position.load(Ordering::Acquire) as u64;
260261
if current_pos >= self.file_size {
@@ -273,6 +274,7 @@ impl MappedFile for DefaultMappedFile {
273274
(self.file_size - current_pos) as i32,
274275
message,
275276
put_message_context,
277+
enabled_append_prop_crc,
276278
);
277279
self.store_timestamp.store(
278280
message.message_ext_broker_inner.store_timestamp(),

rocketmq-store/src/log_file/mapped_file/default_impl_refactor.rs

+1
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ impl MappedFile for LocalMappedFile {
249249
message: &mut MessageExtBatch,
250250
message_callback: &AMC,
251251
put_message_context: &mut PutMessageContext,
252+
enabled_append_prop_crc: bool,
252253
) -> AppendMessageResult {
253254
todo!()
254255
}

rocketmq-store/src/message_encoder/message_ext_encoder.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,7 @@ impl MessageExtEncoder {
382382
self.byte_buf.clear();
383383

384384
let messages_byte_buff = message_ext_batch.wrap();
385-
messages_byte_buff.as_ref()?;
386-
let mut messages_byte_buff = messages_byte_buff.unwrap();
385+
let mut messages_byte_buff = messages_byte_buff?;
387386
let total_length = messages_byte_buff.len();
388387
if total_length > self.max_message_body_size as usize {
389388
warn!(
@@ -426,7 +425,7 @@ impl MessageExtEncoder {
426425
let current = total_length - messages_byte_buff.remaining();
427426
let need_append_last_property_separator = properties_len > 0
428427
&& batch_prop_len > 0
429-
&& messages_byte_buff[total_length - 1..total_length][0]
428+
&& properties_body.as_ref()[(properties_len - 1) as usize..][0]
430429
!= PROPERTY_SEPARATOR as u8;
431430
let topic_data = message_ext_batch
432431
.message_ext_broker_inner

0 commit comments

Comments
 (0)