Skip to content

Commit 1655c20

Browse files
authored
[ISSUE #135]Define rocketmq Message (#136)
1 parent 0707e64 commit 1655c20

File tree

7 files changed

+521
-0
lines changed

7 files changed

+521
-0
lines changed

rocketmq-common/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,8 @@ futures-core = "0.3"
3030
futures-executor = "0.3"
3131
futures-task = "0.3"
3232

33+
bytes.workspace = true
34+
lazy_static.workspace = true
35+
3336
#tools
3437
dirs.workspace = true

rocketmq-common/src/common.rs

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub mod mq_version;
2828
pub mod namesrv;
2929
mod sys_flag;
3030
pub use crate::common::sys_flag::topic_sys_flag as TopicSysFlag;
31+
pub mod message;
3132
pub mod topic;
3233

3334
#[derive(Debug, Clone)]

rocketmq-common/src/common/message.rs

+253
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
mod message_batch;
19+
mod message_id;
20+
mod message_queue;
21+
mod message_single;
22+
23+
use std::collections::HashMap;
24+
25+
pub trait MessageTrait {
26+
fn get_topic(&self) -> &str;
27+
28+
fn set_topic(&mut self, topic: impl Into<String>);
29+
30+
fn get_tags(&self) -> Option<&str>;
31+
32+
fn set_tags(&mut self, tags: impl Into<String>);
33+
34+
fn put_property(&mut self, key: impl Into<String>, value: impl Into<String>);
35+
36+
fn get_properties(&self) -> &HashMap<String, String>;
37+
38+
fn put_user_property(&mut self, name: impl Into<String>, value: impl Into<String>);
39+
40+
fn get_delay_time_level(&self) -> i32;
41+
42+
fn set_delay_time_level(&self, level: i32) -> i32;
43+
}
44+
45+
pub const MESSAGE_MAGIC_CODE_V1: i32 = -626843481;
46+
pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;
47+
48+
#[derive(Debug, PartialEq, Eq, Hash)]
49+
enum MessageVersion {
50+
V1(i32),
51+
V2(i32),
52+
}
53+
54+
impl MessageVersion {
55+
fn value_of_magic_code(magic_code: i32) -> Result<MessageVersion, &'static str> {
56+
match magic_code {
57+
MESSAGE_MAGIC_CODE_V1 => Ok(MessageVersion::V1(MESSAGE_MAGIC_CODE_V1)),
58+
MESSAGE_MAGIC_CODE_V2 => Ok(MessageVersion::V2(MESSAGE_MAGIC_CODE_V2)),
59+
_ => Err("Invalid magicCode"),
60+
}
61+
}
62+
63+
fn get_magic_code(&self) -> i32 {
64+
match self {
65+
MessageVersion::V1(value) => *value,
66+
MessageVersion::V2(value) => *value,
67+
}
68+
}
69+
70+
fn get_topic_length_size(&self) -> usize {
71+
match self {
72+
MessageVersion::V1(_) => 1,
73+
MessageVersion::V2(_) => 2,
74+
}
75+
}
76+
77+
fn get_topic_length(&self, buffer: &[u8]) -> usize {
78+
match self {
79+
MessageVersion::V1(_) => buffer[0] as usize,
80+
MessageVersion::V2(_) => ((buffer[0] as usize) << 8) | (buffer[1] as usize),
81+
}
82+
}
83+
84+
fn get_topic_length_at_index(&self, buffer: &[u8], index: usize) -> usize {
85+
match self {
86+
MessageVersion::V1(_) => buffer[index] as usize,
87+
MessageVersion::V2(_) => ((buffer[index] as usize) << 8) | (buffer[index + 1] as usize),
88+
}
89+
}
90+
91+
fn put_topic_length(&self, buffer: &mut Vec<u8>, topic_length: usize) {
92+
match self {
93+
MessageVersion::V1(_) => buffer.push(topic_length as u8),
94+
MessageVersion::V2(_) => {
95+
buffer.push((topic_length >> 8) as u8);
96+
buffer.push((topic_length & 0xFF) as u8);
97+
}
98+
}
99+
}
100+
}
101+
102+
use std::{collections::HashSet, string::ToString};
103+
104+
use lazy_static::lazy_static;
105+
106+
pub struct MessageConst;
107+
108+
impl MessageConst {
109+
pub const PROPERTY_KEYS: &'static str = "KEYS";
110+
pub const PROPERTY_TAGS: &'static str = "TAGS";
111+
pub const PROPERTY_WAIT_STORE_MSG_OK: &'static str = "WAIT";
112+
pub const PROPERTY_DELAY_TIME_LEVEL: &'static str = "DELAY";
113+
pub const PROPERTY_RETRY_TOPIC: &'static str = "RETRY_TOPIC";
114+
pub const PROPERTY_REAL_TOPIC: &'static str = "REAL_TOPIC";
115+
pub const PROPERTY_REAL_QUEUE_ID: &'static str = "REAL_QID";
116+
pub const PROPERTY_TRANSACTION_PREPARED: &'static str = "TRAN_MSG";
117+
pub const PROPERTY_PRODUCER_GROUP: &'static str = "PGROUP";
118+
pub const PROPERTY_MIN_OFFSET: &'static str = "MIN_OFFSET";
119+
pub const PROPERTY_MAX_OFFSET: &'static str = "MAX_OFFSET";
120+
pub const PROPERTY_BUYER_ID: &'static str = "BUYER_ID";
121+
pub const PROPERTY_ORIGIN_MESSAGE_ID: &'static str = "ORIGIN_MESSAGE_ID";
122+
pub const PROPERTY_TRANSFER_FLAG: &'static str = "TRANSFER_FLAG";
123+
pub const PROPERTY_CORRECTION_FLAG: &'static str = "CORRECTION_FLAG";
124+
pub const PROPERTY_MQ2_FLAG: &'static str = "MQ2_FLAG";
125+
pub const PROPERTY_RECONSUME_TIME: &'static str = "RECONSUME_TIME";
126+
pub const PROPERTY_MSG_REGION: &'static str = "MSG_REGION";
127+
pub const PROPERTY_TRACE_SWITCH: &'static str = "TRACE_ON";
128+
pub const PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX: &'static str = "UNIQ_KEY";
129+
pub const PROPERTY_EXTEND_UNIQ_INFO: &'static str = "EXTEND_UNIQ_INFO";
130+
pub const PROPERTY_MAX_RECONSUME_TIMES: &'static str = "MAX_RECONSUME_TIMES";
131+
pub const PROPERTY_CONSUME_START_TIMESTAMP: &'static str = "CONSUME_START_TIME";
132+
pub const PROPERTY_INNER_NUM: &'static str = "INNER_NUM";
133+
pub const PROPERTY_INNER_BASE: &'static str = "INNER_BASE";
134+
pub const DUP_INFO: &'static str = "DUP_INFO";
135+
pub const PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS: &'static str =
136+
"CHECK_IMMUNITY_TIME_IN_SECONDS";
137+
pub const PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET: &'static str =
138+
"TRAN_PREPARED_QUEUE_OFFSET";
139+
pub const PROPERTY_TRANSACTION_ID: &'static str = "__transactionId__";
140+
pub const PROPERTY_TRANSACTION_CHECK_TIMES: &'static str = "TRANSACTION_CHECK_TIMES";
141+
pub const PROPERTY_INSTANCE_ID: &'static str = "INSTANCE_ID";
142+
pub const PROPERTY_CORRELATION_ID: &'static str = "CORRELATION_ID";
143+
pub const PROPERTY_MESSAGE_REPLY_TO_CLIENT: &'static str = "REPLY_TO_CLIENT";
144+
pub const PROPERTY_MESSAGE_TTL: &'static str = "TTL";
145+
pub const PROPERTY_REPLY_MESSAGE_ARRIVE_TIME: &'static str = "ARRIVE_TIME";
146+
pub const PROPERTY_PUSH_REPLY_TIME: &'static str = "PUSH_REPLY_TIME";
147+
pub const PROPERTY_CLUSTER: &'static str = "CLUSTER";
148+
pub const PROPERTY_MESSAGE_TYPE: &'static str = "MSG_TYPE";
149+
pub const PROPERTY_POP_CK: &'static str = "POP_CK";
150+
pub const PROPERTY_POP_CK_OFFSET: &'static str = "POP_CK_OFFSET";
151+
pub const PROPERTY_FIRST_POP_TIME: &'static str = "1ST_POP_TIME";
152+
pub const PROPERTY_SHARDING_KEY: &'static str = "__SHARDINGKEY";
153+
pub const PROPERTY_FORWARD_QUEUE_ID: &'static str = "PROPERTY_FORWARD_QUEUE_ID";
154+
pub const PROPERTY_REDIRECT: &'static str = "REDIRECT";
155+
pub const PROPERTY_INNER_MULTI_DISPATCH: &'static str = "INNER_MULTI_DISPATCH";
156+
pub const PROPERTY_INNER_MULTI_QUEUE_OFFSET: &'static str = "INNER_MULTI_QUEUE_OFFSET";
157+
pub const PROPERTY_TRACE_CONTEXT: &'static str = "TRACE_CONTEXT";
158+
pub const PROPERTY_TIMER_DELAY_SEC: &'static str = "TIMER_DELAY_SEC";
159+
pub const PROPERTY_TIMER_DELIVER_MS: &'static str = "TIMER_DELIVER_MS";
160+
pub const PROPERTY_BORN_HOST: &'static str = "__BORNHOST";
161+
pub const PROPERTY_BORN_TIMESTAMP: &'static str = "BORN_TIMESTAMP";
162+
163+
/**
164+
* property which name starts with "__RMQ.TRANSIENT." is called transient one that will not
165+
* be stored in broker disks.
166+
*/
167+
pub const PROPERTY_TRANSIENT_PREFIX: &'static str = "__RMQ.TRANSIENT.";
168+
169+
/**
170+
* the transient property key of topicSysFlag (set by the client when pulling messages)
171+
*/
172+
pub const PROPERTY_TRANSIENT_TOPIC_CONFIG: &'static str = "__RMQ.TRANSIENT.TOPIC_SYS_FLAG";
173+
174+
/**
175+
* the transient property key of groupSysFlag (set by the client when pulling messages)
176+
*/
177+
pub const PROPERTY_TRANSIENT_GROUP_CONFIG: &'static str = "__RMQ.TRANSIENT.GROUP_SYS_FLAG";
178+
179+
pub const KEY_SEPARATOR: &'static str = " ";
180+
181+
pub const PROPERTY_TIMER_ENQUEUE_MS: &'static str = "TIMER_ENQUEUE_MS";
182+
pub const PROPERTY_TIMER_DEQUEUE_MS: &'static str = "TIMER_DEQUEUE_MS";
183+
pub const PROPERTY_TIMER_ROLL_TIMES: &'static str = "TIMER_ROLL_TIMES";
184+
pub const PROPERTY_TIMER_OUT_MS: &'static str = "TIMER_OUT_MS";
185+
pub const PROPERTY_TIMER_DEL_UNIQKEY: &'static str = "TIMER_DEL_UNIQKEY";
186+
pub const PROPERTY_TIMER_DELAY_LEVEL: &'static str = "TIMER_DELAY_LEVEL";
187+
pub const PROPERTY_TIMER_DELAY_MS: &'static str = "TIMER_DELAY_MS";
188+
pub const PROPERTY_CRC32: &'static str = "__CRC32#";
189+
190+
/**
191+
* properties for DLQ
192+
*/
193+
pub const PROPERTY_DLQ_ORIGIN_TOPIC: &'static str = "DLQ_ORIGIN_TOPIC";
194+
pub const PROPERTY_DLQ_ORIGIN_MESSAGE_ID: &'static str = "DLQ_ORIGIN_MESSAGE_ID";
195+
}
196+
197+
lazy_static! {
198+
pub static ref STRING_HASH_SET: HashSet<&'static str> = {
199+
let mut set = HashSet::with_capacity(64);
200+
set.insert(MessageConst::PROPERTY_TRACE_SWITCH);
201+
set.insert(MessageConst::PROPERTY_MSG_REGION);
202+
set.insert(MessageConst::PROPERTY_KEYS);
203+
set.insert(MessageConst::PROPERTY_TAGS);
204+
set.insert(MessageConst::PROPERTY_WAIT_STORE_MSG_OK);
205+
set.insert(MessageConst::PROPERTY_DELAY_TIME_LEVEL);
206+
set.insert(MessageConst::PROPERTY_RETRY_TOPIC);
207+
set.insert(MessageConst::PROPERTY_REAL_TOPIC);
208+
set.insert(MessageConst::PROPERTY_REAL_QUEUE_ID);
209+
set.insert(MessageConst::PROPERTY_TRANSACTION_PREPARED);
210+
set.insert(MessageConst::PROPERTY_PRODUCER_GROUP);
211+
set.insert(MessageConst::PROPERTY_MIN_OFFSET);
212+
set.insert(MessageConst::PROPERTY_MAX_OFFSET);
213+
set.insert(MessageConst::PROPERTY_BUYER_ID);
214+
set.insert(MessageConst::PROPERTY_ORIGIN_MESSAGE_ID);
215+
set.insert(MessageConst::PROPERTY_TRANSFER_FLAG);
216+
set.insert(MessageConst::PROPERTY_CORRECTION_FLAG);
217+
set.insert(MessageConst::PROPERTY_MQ2_FLAG);
218+
set.insert(MessageConst::PROPERTY_RECONSUME_TIME);
219+
set.insert(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
220+
set.insert(MessageConst::PROPERTY_MAX_RECONSUME_TIMES);
221+
set.insert(MessageConst::PROPERTY_CONSUME_START_TIMESTAMP);
222+
set.insert(MessageConst::PROPERTY_POP_CK);
223+
set.insert(MessageConst::PROPERTY_POP_CK_OFFSET);
224+
set.insert(MessageConst::PROPERTY_FIRST_POP_TIME);
225+
set.insert(MessageConst::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
226+
set.insert(MessageConst::DUP_INFO);
227+
set.insert(MessageConst::PROPERTY_EXTEND_UNIQ_INFO);
228+
set.insert(MessageConst::PROPERTY_INSTANCE_ID);
229+
set.insert(MessageConst::PROPERTY_CORRELATION_ID);
230+
set.insert(MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT);
231+
set.insert(MessageConst::PROPERTY_MESSAGE_TTL);
232+
set.insert(MessageConst::PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
233+
set.insert(MessageConst::PROPERTY_PUSH_REPLY_TIME);
234+
set.insert(MessageConst::PROPERTY_CLUSTER);
235+
set.insert(MessageConst::PROPERTY_MESSAGE_TYPE);
236+
set.insert(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET);
237+
set.insert(MessageConst::PROPERTY_TIMER_DELAY_MS);
238+
set.insert(MessageConst::PROPERTY_TIMER_DELAY_SEC);
239+
set.insert(MessageConst::PROPERTY_TIMER_DELIVER_MS);
240+
set.insert(MessageConst::PROPERTY_TIMER_ENQUEUE_MS);
241+
set.insert(MessageConst::PROPERTY_TIMER_DEQUEUE_MS);
242+
set.insert(MessageConst::PROPERTY_TIMER_ROLL_TIMES);
243+
set.insert(MessageConst::PROPERTY_TIMER_OUT_MS);
244+
set.insert(MessageConst::PROPERTY_TIMER_DEL_UNIQKEY);
245+
set.insert(MessageConst::PROPERTY_TIMER_DELAY_LEVEL);
246+
set.insert(MessageConst::PROPERTY_BORN_HOST);
247+
set.insert(MessageConst::PROPERTY_BORN_TIMESTAMP);
248+
set.insert(MessageConst::PROPERTY_DLQ_ORIGIN_TOPIC);
249+
set.insert(MessageConst::PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
250+
set.insert(MessageConst::PROPERTY_CRC32);
251+
set
252+
};
253+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::collections::HashMap;
19+
20+
use crate::common::message::{
21+
message_single::{Message, MessageExtBrokerInner},
22+
MessageTrait,
23+
};
24+
25+
pub struct MessageBatch {
26+
pub messages: Vec<Message>,
27+
}
28+
29+
impl MessageTrait for MessageBatch {
30+
fn get_topic(&self) -> &str {
31+
todo!()
32+
}
33+
34+
fn set_topic(&mut self, _topic: impl Into<String>) {
35+
todo!()
36+
}
37+
38+
fn get_tags(&self) -> Option<&str> {
39+
todo!()
40+
}
41+
42+
fn set_tags(&mut self, _tags: impl Into<String>) {
43+
todo!()
44+
}
45+
46+
fn put_property(&mut self, _key: impl Into<String>, _value: impl Into<String>) {
47+
todo!()
48+
}
49+
50+
fn get_properties(&self) -> &HashMap<String, String> {
51+
todo!()
52+
}
53+
54+
fn put_user_property(&mut self, _name: impl Into<String>, _value: impl Into<String>) {
55+
todo!()
56+
}
57+
58+
fn get_delay_time_level(&self) -> i32 {
59+
todo!()
60+
}
61+
62+
fn set_delay_time_level(&self, _level: i32) -> i32 {
63+
todo!()
64+
}
65+
}
66+
67+
#[derive(Debug)]
68+
pub struct MessageExtBatch {
69+
pub message_ext_broker_inner: MessageExtBrokerInner,
70+
pub is_inner_batch: bool,
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::net::SocketAddr;
19+
20+
#[derive(Debug)]
21+
struct MessageId {
22+
pub address: SocketAddr,
23+
pub offset: i64,
24+
}

0 commit comments

Comments
 (0)