Skip to content

Commit d9e5172

Browse files
authored
[ISSUE #230]🍻Implement MessageExtEncoder (#231)
1 parent d53b0cd commit d9e5172

File tree

7 files changed

+823
-31
lines changed

7 files changed

+823
-31
lines changed

rocketmq-common/src/common/message.rs

+35-21
Original file line numberDiff line numberDiff line change
@@ -54,62 +54,76 @@ pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;
5454

5555
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
5656
pub enum MessageVersion {
57-
V1(i32),
58-
V2(i32),
57+
V1,
58+
V2,
5959
}
6060

6161
impl Default for MessageVersion {
6262
fn default() -> Self {
63-
Self::V1(MESSAGE_MAGIC_CODE_V1)
63+
Self::V1
6464
}
6565
}
6666

6767
impl MessageVersion {
68-
fn value_of_magic_code(magic_code: i32) -> Result<MessageVersion, &'static str> {
68+
pub fn value_of_magic_code(magic_code: i32) -> Result<MessageVersion, &'static str> {
6969
match magic_code {
70-
MESSAGE_MAGIC_CODE_V1 => Ok(MessageVersion::V1(MESSAGE_MAGIC_CODE_V1)),
71-
MESSAGE_MAGIC_CODE_V2 => Ok(MessageVersion::V2(MESSAGE_MAGIC_CODE_V2)),
70+
MESSAGE_MAGIC_CODE_V1 => Ok(MessageVersion::V1),
71+
MESSAGE_MAGIC_CODE_V2 => Ok(MessageVersion::V2),
7272
_ => Err("Invalid magicCode"),
7373
}
7474
}
7575

76-
fn get_magic_code(&self) -> i32 {
76+
pub fn get_magic_code(&self) -> i32 {
7777
match self {
78-
MessageVersion::V1(value) => *value,
79-
MessageVersion::V2(value) => *value,
78+
MessageVersion::V1 => MESSAGE_MAGIC_CODE_V1,
79+
MessageVersion::V2 => MESSAGE_MAGIC_CODE_V2,
8080
}
8181
}
8282

83-
fn get_topic_length_size(&self) -> usize {
83+
pub fn get_topic_length_size(&self) -> usize {
8484
match self {
85-
MessageVersion::V1(_) => 1,
86-
MessageVersion::V2(_) => 2,
85+
MessageVersion::V1 => 1,
86+
MessageVersion::V2 => 2,
8787
}
8888
}
8989

90-
fn get_topic_length(&self, buffer: &[u8]) -> usize {
90+
pub fn get_topic_length(&self, buffer: &[u8]) -> usize {
9191
match self {
92-
MessageVersion::V1(_) => buffer[0] as usize,
93-
MessageVersion::V2(_) => ((buffer[0] as usize) << 8) | (buffer[1] as usize),
92+
MessageVersion::V1 => buffer[0] as usize,
93+
MessageVersion::V2 => ((buffer[0] as usize) << 8) | (buffer[1] as usize),
9494
}
9595
}
9696

97-
fn get_topic_length_at_index(&self, buffer: &[u8], index: usize) -> usize {
97+
pub fn get_topic_length_at_index(&self, buffer: &[u8], index: usize) -> usize {
9898
match self {
99-
MessageVersion::V1(_) => buffer[index] as usize,
100-
MessageVersion::V2(_) => ((buffer[index] as usize) << 8) | (buffer[index + 1] as usize),
99+
MessageVersion::V1 => buffer[index] as usize,
100+
MessageVersion::V2 => ((buffer[index] as usize) << 8) | (buffer[index + 1] as usize),
101101
}
102102
}
103103

104-
fn put_topic_length(&self, buffer: &mut Vec<u8>, topic_length: usize) {
104+
pub fn put_topic_length(&self, buffer: &mut Vec<u8>, topic_length: usize) {
105105
match self {
106-
MessageVersion::V1(_) => buffer.push(topic_length as u8),
107-
MessageVersion::V2(_) => {
106+
MessageVersion::V1 => buffer.push(topic_length as u8),
107+
MessageVersion::V2 => {
108108
buffer.push((topic_length >> 8) as u8);
109109
buffer.push((topic_length & 0xFF) as u8);
110110
}
111111
}
112112
}
113+
114+
pub fn is_v1(&self) -> bool {
115+
match self {
116+
MessageVersion::V1 => true,
117+
MessageVersion::V2 => false,
118+
}
119+
}
120+
121+
pub fn is_v2(&self) -> bool {
122+
match self {
123+
MessageVersion::V1 => false,
124+
MessageVersion::V2 => true,
125+
}
126+
}
113127
}
114128

115129
pub struct MessageConst;

rocketmq-common/src/common/message/message_batch.rs

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
use std::collections::HashMap;
1919

20+
use bytes::Bytes;
21+
2022
use crate::common::message::{
2123
message_single::{Message, MessageExtBrokerInner},
2224
MessageTrait,
@@ -70,3 +72,9 @@ pub struct MessageExtBatch {
7072
pub message_ext_broker_inner: MessageExtBrokerInner,
7173
pub is_inner_batch: bool,
7274
}
75+
76+
impl MessageExtBatch {
77+
pub fn wrap(&self) -> Option<Bytes> {
78+
self.message_ext_broker_inner.body()
79+
}
80+
}

rocketmq-common/src/common/message/message_single.rs

+204-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
use std::{collections::HashMap, net::SocketAddr};
1919

20+
use bytes::{Buf, BufMut};
21+
2022
use crate::{
2123
common::{
2224
message::{MessageTrait, MessageVersion, MESSAGE_MAGIC_CODE_V1},
@@ -30,14 +32,32 @@ pub struct Message {
3032
pub topic: String,
3133
pub flag: i32,
3234
pub properties: HashMap<String, String>,
33-
pub body: bytes::Bytes,
35+
pub body: Option<bytes::Bytes>,
3436
pub transaction_id: Option<String>,
3537
}
3638

3739
impl Message {
3840
pub fn clear_property(&mut self, name: impl Into<String>) {
3941
self.properties.remove(name.into().as_str());
4042
}
43+
44+
pub fn body(&self) -> Option<bytes::Bytes> {
45+
self.body.as_ref().cloned()
46+
}
47+
48+
pub fn flag(&self) -> i32 {
49+
self.flag
50+
}
51+
52+
pub fn topic(&self) -> &str {
53+
&self.topic
54+
}
55+
pub fn properties(&self) -> &HashMap<String, String> {
56+
&self.properties
57+
}
58+
pub fn transaction_id(&self) -> Option<&str> {
59+
self.transaction_id.as_deref()
60+
}
4161
}
4262

4363
#[allow(unused_variables)]
@@ -99,6 +119,31 @@ pub struct MessageExt {
99119
}
100120

101121
impl MessageExt {
122+
pub fn socket_address_2_byte_buffer(ip: &SocketAddr) -> bytes::Bytes {
123+
match ip {
124+
SocketAddr::V4(value) => {
125+
let mut byte_buffer = bytes::BytesMut::with_capacity(4 + 4);
126+
byte_buffer.put_slice(&value.ip().octets());
127+
byte_buffer.put_i32(value.port() as i32);
128+
byte_buffer.copy_to_bytes(byte_buffer.len())
129+
}
130+
SocketAddr::V6(value) => {
131+
let mut byte_buffer = bytes::BytesMut::with_capacity(16 + 4);
132+
byte_buffer.put_slice(&value.ip().octets());
133+
byte_buffer.put_i32(value.port() as i32);
134+
byte_buffer.copy_to_bytes(byte_buffer.len())
135+
}
136+
}
137+
}
138+
139+
pub fn born_host_bytes(&self) -> bytes::Bytes {
140+
Self::socket_address_2_byte_buffer(&self.born_host)
141+
}
142+
143+
pub fn born_store_bytes(&self) -> bytes::Bytes {
144+
Self::socket_address_2_byte_buffer(&self.store_host)
145+
}
146+
102147
pub fn topic(&self) -> &str {
103148
self.message_inner.topic()
104149
}
@@ -118,6 +163,108 @@ impl MessageExt {
118163
pub fn with_store_host_v6_flag(&mut self) {
119164
self.sys_flag |= MessageSysFlag::STOREHOSTADDRESS_V6_FLAG;
120165
}
166+
167+
pub fn body(&self) -> Option<bytes::Bytes> {
168+
self.message_inner.body()
169+
}
170+
171+
#[inline]
172+
pub fn sys_flag(&self) -> i32 {
173+
self.sys_flag
174+
}
175+
#[inline]
176+
pub fn body_crc(&self) -> u32 {
177+
self.body_crc
178+
}
179+
#[inline]
180+
pub fn queue_id(&self) -> i32 {
181+
self.queue_id
182+
}
183+
184+
pub fn flag(&self) -> i32 {
185+
self.message_inner.flag()
186+
}
187+
188+
pub fn message_inner(&self) -> &Message {
189+
&self.message_inner
190+
}
191+
pub fn broker_name(&self) -> &str {
192+
&self.broker_name
193+
}
194+
pub fn store_size(&self) -> i32 {
195+
self.store_size
196+
}
197+
pub fn queue_offset(&self) -> i64 {
198+
self.queue_offset
199+
}
200+
pub fn born_timestamp(&self) -> i64 {
201+
self.born_timestamp
202+
}
203+
pub fn store_timestamp(&self) -> i64 {
204+
self.store_timestamp
205+
}
206+
pub fn msg_id(&self) -> &str {
207+
&self.msg_id
208+
}
209+
pub fn commit_log_offset(&self) -> i64 {
210+
self.commit_log_offset
211+
}
212+
pub fn reconsume_times(&self) -> i32 {
213+
self.reconsume_times
214+
}
215+
pub fn prepared_transaction_offset(&self) -> i64 {
216+
self.prepared_transaction_offset
217+
}
218+
219+
pub fn set_message_inner(&mut self, message_inner: Message) {
220+
self.message_inner = message_inner;
221+
}
222+
pub fn set_broker_name(&mut self, broker_name: String) {
223+
self.broker_name = broker_name;
224+
}
225+
pub fn set_queue_id(&mut self, queue_id: i32) {
226+
self.queue_id = queue_id;
227+
}
228+
pub fn set_store_size(&mut self, store_size: i32) {
229+
self.store_size = store_size;
230+
}
231+
pub fn set_queue_offset(&mut self, queue_offset: i64) {
232+
self.queue_offset = queue_offset;
233+
}
234+
pub fn set_sys_flag(&mut self, sys_flag: i32) {
235+
self.sys_flag = sys_flag;
236+
}
237+
pub fn set_born_timestamp(&mut self, born_timestamp: i64) {
238+
self.born_timestamp = born_timestamp;
239+
}
240+
pub fn set_born_host(&mut self, born_host: SocketAddr) {
241+
self.born_host = born_host;
242+
}
243+
pub fn set_store_timestamp(&mut self, store_timestamp: i64) {
244+
self.store_timestamp = store_timestamp;
245+
}
246+
pub fn set_store_host(&mut self, store_host: SocketAddr) {
247+
self.store_host = store_host;
248+
}
249+
pub fn set_msg_id(&mut self, msg_id: String) {
250+
self.msg_id = msg_id;
251+
}
252+
pub fn set_commit_log_offset(&mut self, commit_log_offset: i64) {
253+
self.commit_log_offset = commit_log_offset;
254+
}
255+
pub fn set_body_crc(&mut self, body_crc: u32) {
256+
self.body_crc = body_crc;
257+
}
258+
pub fn set_reconsume_times(&mut self, reconsume_times: i32) {
259+
self.reconsume_times = reconsume_times;
260+
}
261+
pub fn set_prepared_transaction_offset(&mut self, prepared_transaction_offset: i64) {
262+
self.prepared_transaction_offset = prepared_transaction_offset;
263+
}
264+
265+
pub fn properties(&self) -> &HashMap<String, String> {
266+
self.message_inner.properties()
267+
}
121268
}
122269

123270
impl Default for MessageExt {
@@ -158,7 +305,7 @@ pub struct MessageExtBrokerInner {
158305
}
159306

160307
impl MessageExtBrokerInner {
161-
const VERSION: MessageVersion = MessageVersion::V1(MESSAGE_MAGIC_CODE_V1);
308+
const VERSION: MessageVersion = MessageVersion::V1;
162309

163310
pub fn delete_property(&mut self, name: impl Into<String>) {
164311
let name = name.into();
@@ -173,6 +320,10 @@ impl MessageExtBrokerInner {
173320
self.version = version;
174321
}
175322

323+
pub fn version(&self) -> MessageVersion {
324+
self.version
325+
}
326+
176327
pub fn topic(&self) -> &str {
177328
self.message_ext_inner.topic()
178329
}
@@ -192,4 +343,55 @@ impl MessageExtBrokerInner {
192343
pub fn with_store_host_v6_flag(&mut self) {
193344
self.message_ext_inner.with_store_host_v6_flag()
194345
}
346+
347+
pub fn body(&self) -> Option<bytes::Bytes> {
348+
self.message_ext_inner.body()
349+
}
350+
351+
pub fn sys_flag(&self) -> i32 {
352+
self.message_ext_inner.sys_flag()
353+
}
354+
pub fn body_crc(&self) -> u32 {
355+
self.message_ext_inner.body_crc()
356+
}
357+
pub fn queue_id(&self) -> i32 {
358+
self.message_ext_inner.queue_id()
359+
}
360+
pub fn flag(&self) -> i32 {
361+
self.message_ext_inner.flag()
362+
}
363+
pub fn born_timestamp(&self) -> i64 {
364+
self.message_ext_inner.born_timestamp()
365+
}
366+
367+
pub fn store_timestamp(&self) -> i64 {
368+
self.message_ext_inner.store_timestamp()
369+
}
370+
371+
pub fn born_host_bytes(&self) -> bytes::Bytes {
372+
self.message_ext_inner.born_host_bytes()
373+
}
374+
375+
pub fn store_host_bytes(&self) -> bytes::Bytes {
376+
self.message_ext_inner.born_store_bytes()
377+
}
378+
379+
pub fn reconsume_times(&self) -> i32 {
380+
self.message_ext_inner.reconsume_times()
381+
}
382+
pub fn prepared_transaction_offset(&self) -> i64 {
383+
self.message_ext_inner.prepared_transaction_offset()
384+
}
385+
386+
pub fn property(&self, name: &str) -> Option<String> {
387+
self.message_ext_inner.properties().get(name).cloned()
388+
}
389+
390+
pub fn properties_string(&self) -> &str {
391+
self.properties_string.as_str()
392+
}
393+
394+
pub fn queue_offset(&self) -> i64 {
395+
self.message_ext_inner.queue_offset()
396+
}
195397
}

rocketmq-store/src/config/message_store_config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub struct MessageStoreConfig {
8787
pub file_reserved_time: usize,
8888
pub delete_file_batch_max: usize,
8989
pub put_msg_index_hight_water: usize,
90-
pub max_message_size: usize,
90+
pub max_message_size: i32,
9191
pub check_crc_on_recover: bool,
9292
pub flush_commit_log_least_pages: usize,
9393
pub commit_commit_log_least_pages: usize,

0 commit comments

Comments
 (0)