Skip to content

[ISSUE #230]🍻Implement MessageExtEncoder #231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions rocketmq-common/src/common/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,62 +54,76 @@ pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;

#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub enum MessageVersion {
V1(i32),
V2(i32),
V1,
V2,
}

impl Default for MessageVersion {
fn default() -> Self {
Self::V1(MESSAGE_MAGIC_CODE_V1)
Self::V1
}
}

impl MessageVersion {
fn value_of_magic_code(magic_code: i32) -> Result<MessageVersion, &'static str> {
pub fn value_of_magic_code(magic_code: i32) -> Result<MessageVersion, &'static str> {
match magic_code {
MESSAGE_MAGIC_CODE_V1 => Ok(MessageVersion::V1(MESSAGE_MAGIC_CODE_V1)),
MESSAGE_MAGIC_CODE_V2 => Ok(MessageVersion::V2(MESSAGE_MAGIC_CODE_V2)),
MESSAGE_MAGIC_CODE_V1 => Ok(MessageVersion::V1),
MESSAGE_MAGIC_CODE_V2 => Ok(MessageVersion::V2),
_ => Err("Invalid magicCode"),
}
}

fn get_magic_code(&self) -> i32 {
pub fn get_magic_code(&self) -> i32 {
match self {
MessageVersion::V1(value) => *value,
MessageVersion::V2(value) => *value,
MessageVersion::V1 => MESSAGE_MAGIC_CODE_V1,
MessageVersion::V2 => MESSAGE_MAGIC_CODE_V2,
}
}

fn get_topic_length_size(&self) -> usize {
pub fn get_topic_length_size(&self) -> usize {
match self {
MessageVersion::V1(_) => 1,
MessageVersion::V2(_) => 2,
MessageVersion::V1 => 1,
MessageVersion::V2 => 2,
}
}

fn get_topic_length(&self, buffer: &[u8]) -> usize {
pub fn get_topic_length(&self, buffer: &[u8]) -> usize {
match self {
MessageVersion::V1(_) => buffer[0] as usize,
MessageVersion::V2(_) => ((buffer[0] as usize) << 8) | (buffer[1] as usize),
MessageVersion::V1 => buffer[0] as usize,
MessageVersion::V2 => ((buffer[0] as usize) << 8) | (buffer[1] as usize),
}
}

fn get_topic_length_at_index(&self, buffer: &[u8], index: usize) -> usize {
pub fn get_topic_length_at_index(&self, buffer: &[u8], index: usize) -> usize {
match self {
MessageVersion::V1(_) => buffer[index] as usize,
MessageVersion::V2(_) => ((buffer[index] as usize) << 8) | (buffer[index + 1] as usize),
MessageVersion::V1 => buffer[index] as usize,
MessageVersion::V2 => ((buffer[index] as usize) << 8) | (buffer[index + 1] as usize),
}
}

fn put_topic_length(&self, buffer: &mut Vec<u8>, topic_length: usize) {
pub fn put_topic_length(&self, buffer: &mut Vec<u8>, topic_length: usize) {
match self {
MessageVersion::V1(_) => buffer.push(topic_length as u8),
MessageVersion::V2(_) => {
MessageVersion::V1 => buffer.push(topic_length as u8),
MessageVersion::V2 => {
buffer.push((topic_length >> 8) as u8);
buffer.push((topic_length & 0xFF) as u8);
}
}
}

pub fn is_v1(&self) -> bool {
match self {
MessageVersion::V1 => true,
MessageVersion::V2 => false,
}
}

pub fn is_v2(&self) -> bool {
match self {
MessageVersion::V1 => false,
MessageVersion::V2 => true,
}
}
}

pub struct MessageConst;
Expand Down
8 changes: 8 additions & 0 deletions rocketmq-common/src/common/message/message_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

use std::collections::HashMap;

use bytes::Bytes;

use crate::common::message::{
message_single::{Message, MessageExtBrokerInner},
MessageTrait,
Expand Down Expand Up @@ -70,3 +72,9 @@ pub struct MessageExtBatch {
pub message_ext_broker_inner: MessageExtBrokerInner,
pub is_inner_batch: bool,
}

impl MessageExtBatch {
pub fn wrap(&self) -> Option<Bytes> {
self.message_ext_broker_inner.body()
}
}
206 changes: 204 additions & 2 deletions rocketmq-common/src/common/message/message_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

use std::{collections::HashMap, net::SocketAddr};

use bytes::{Buf, BufMut};

use crate::{
common::{
message::{MessageTrait, MessageVersion, MESSAGE_MAGIC_CODE_V1},
Expand All @@ -30,14 +32,32 @@ pub struct Message {
pub topic: String,
pub flag: i32,
pub properties: HashMap<String, String>,
pub body: bytes::Bytes,
pub body: Option<bytes::Bytes>,
pub transaction_id: Option<String>,
}

impl Message {
pub fn clear_property(&mut self, name: impl Into<String>) {
self.properties.remove(name.into().as_str());
}

pub fn body(&self) -> Option<bytes::Bytes> {
self.body.as_ref().cloned()
}

pub fn flag(&self) -> i32 {
self.flag
}

pub fn topic(&self) -> &str {
&self.topic
}
pub fn properties(&self) -> &HashMap<String, String> {
&self.properties
}
pub fn transaction_id(&self) -> Option<&str> {
self.transaction_id.as_deref()
}
}

#[allow(unused_variables)]
Expand Down Expand Up @@ -99,6 +119,31 @@ pub struct MessageExt {
}

impl MessageExt {
pub fn socket_address_2_byte_buffer(ip: &SocketAddr) -> bytes::Bytes {
match ip {
SocketAddr::V4(value) => {
let mut byte_buffer = bytes::BytesMut::with_capacity(4 + 4);
byte_buffer.put_slice(&value.ip().octets());
byte_buffer.put_i32(value.port() as i32);
byte_buffer.copy_to_bytes(byte_buffer.len())
}
SocketAddr::V6(value) => {
let mut byte_buffer = bytes::BytesMut::with_capacity(16 + 4);
byte_buffer.put_slice(&value.ip().octets());
byte_buffer.put_i32(value.port() as i32);
byte_buffer.copy_to_bytes(byte_buffer.len())
}
}
}

pub fn born_host_bytes(&self) -> bytes::Bytes {
Self::socket_address_2_byte_buffer(&self.born_host)
}

pub fn born_store_bytes(&self) -> bytes::Bytes {
Self::socket_address_2_byte_buffer(&self.store_host)
}

pub fn topic(&self) -> &str {
self.message_inner.topic()
}
Expand All @@ -118,6 +163,108 @@ impl MessageExt {
pub fn with_store_host_v6_flag(&mut self) {
self.sys_flag |= MessageSysFlag::STOREHOSTADDRESS_V6_FLAG;
}

pub fn body(&self) -> Option<bytes::Bytes> {
self.message_inner.body()
}

#[inline]
pub fn sys_flag(&self) -> i32 {
self.sys_flag
}
#[inline]
pub fn body_crc(&self) -> u32 {
self.body_crc
}
#[inline]
pub fn queue_id(&self) -> i32 {
self.queue_id
}

pub fn flag(&self) -> i32 {
self.message_inner.flag()
}

pub fn message_inner(&self) -> &Message {
&self.message_inner
}
pub fn broker_name(&self) -> &str {
&self.broker_name
}
pub fn store_size(&self) -> i32 {
self.store_size
}
pub fn queue_offset(&self) -> i64 {
self.queue_offset
}
pub fn born_timestamp(&self) -> i64 {
self.born_timestamp
}
pub fn store_timestamp(&self) -> i64 {
self.store_timestamp
}
pub fn msg_id(&self) -> &str {
&self.msg_id
}
pub fn commit_log_offset(&self) -> i64 {
self.commit_log_offset
}
pub fn reconsume_times(&self) -> i32 {
self.reconsume_times
}
pub fn prepared_transaction_offset(&self) -> i64 {
self.prepared_transaction_offset
}

pub fn set_message_inner(&mut self, message_inner: Message) {
self.message_inner = message_inner;
}
pub fn set_broker_name(&mut self, broker_name: String) {
self.broker_name = broker_name;
}
pub fn set_queue_id(&mut self, queue_id: i32) {
self.queue_id = queue_id;
}
pub fn set_store_size(&mut self, store_size: i32) {
self.store_size = store_size;
}
pub fn set_queue_offset(&mut self, queue_offset: i64) {
self.queue_offset = queue_offset;
}
pub fn set_sys_flag(&mut self, sys_flag: i32) {
self.sys_flag = sys_flag;
}
pub fn set_born_timestamp(&mut self, born_timestamp: i64) {
self.born_timestamp = born_timestamp;
}
pub fn set_born_host(&mut self, born_host: SocketAddr) {
self.born_host = born_host;
}
pub fn set_store_timestamp(&mut self, store_timestamp: i64) {
self.store_timestamp = store_timestamp;
}
pub fn set_store_host(&mut self, store_host: SocketAddr) {
self.store_host = store_host;
}
pub fn set_msg_id(&mut self, msg_id: String) {
self.msg_id = msg_id;
}
pub fn set_commit_log_offset(&mut self, commit_log_offset: i64) {
self.commit_log_offset = commit_log_offset;
}
pub fn set_body_crc(&mut self, body_crc: u32) {
self.body_crc = body_crc;
}
pub fn set_reconsume_times(&mut self, reconsume_times: i32) {
self.reconsume_times = reconsume_times;
}
pub fn set_prepared_transaction_offset(&mut self, prepared_transaction_offset: i64) {
self.prepared_transaction_offset = prepared_transaction_offset;
}

pub fn properties(&self) -> &HashMap<String, String> {
self.message_inner.properties()
}
}

impl Default for MessageExt {
Expand Down Expand Up @@ -158,7 +305,7 @@ pub struct MessageExtBrokerInner {
}

impl MessageExtBrokerInner {
const VERSION: MessageVersion = MessageVersion::V1(MESSAGE_MAGIC_CODE_V1);
const VERSION: MessageVersion = MessageVersion::V1;

pub fn delete_property(&mut self, name: impl Into<String>) {
let name = name.into();
Expand All @@ -173,6 +320,10 @@ impl MessageExtBrokerInner {
self.version = version;
}

pub fn version(&self) -> MessageVersion {
self.version
}

pub fn topic(&self) -> &str {
self.message_ext_inner.topic()
}
Expand All @@ -192,4 +343,55 @@ impl MessageExtBrokerInner {
pub fn with_store_host_v6_flag(&mut self) {
self.message_ext_inner.with_store_host_v6_flag()
}

pub fn body(&self) -> Option<bytes::Bytes> {
self.message_ext_inner.body()
}

pub fn sys_flag(&self) -> i32 {
self.message_ext_inner.sys_flag()
}
pub fn body_crc(&self) -> u32 {
self.message_ext_inner.body_crc()
}
pub fn queue_id(&self) -> i32 {
self.message_ext_inner.queue_id()
}
pub fn flag(&self) -> i32 {
self.message_ext_inner.flag()
}
pub fn born_timestamp(&self) -> i64 {
self.message_ext_inner.born_timestamp()
}

pub fn store_timestamp(&self) -> i64 {
self.message_ext_inner.store_timestamp()
}

pub fn born_host_bytes(&self) -> bytes::Bytes {
self.message_ext_inner.born_host_bytes()
}

pub fn store_host_bytes(&self) -> bytes::Bytes {
self.message_ext_inner.born_store_bytes()
}

pub fn reconsume_times(&self) -> i32 {
self.message_ext_inner.reconsume_times()
}
pub fn prepared_transaction_offset(&self) -> i64 {
self.message_ext_inner.prepared_transaction_offset()
}

pub fn property(&self, name: &str) -> Option<String> {
self.message_ext_inner.properties().get(name).cloned()
}

pub fn properties_string(&self) -> &str {
self.properties_string.as_str()
}

pub fn queue_offset(&self) -> i64 {
self.message_ext_inner.queue_offset()
}
}
2 changes: 1 addition & 1 deletion rocketmq-store/src/config/message_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub struct MessageStoreConfig {
pub file_reserved_time: usize,
pub delete_file_batch_max: usize,
pub put_msg_index_hight_water: usize,
pub max_message_size: usize,
pub max_message_size: i32,
pub check_crc_on_recover: bool,
pub flush_commit_log_least_pages: usize,
pub commit_commit_log_least_pages: usize,
Expand Down
Loading