Skip to content

Commit 4bf6e4f

Browse files
authored
[ISSUE #368]🚀Optimize send message logic (#369)
1 parent 7763145 commit 4bf6e4f

File tree

16 files changed

+204
-74
lines changed

16 files changed

+204
-74
lines changed

Cargo.toml

+5-3
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ Unofficial Rust implementation of Apache RocketMQ
2525
"""
2626
[workspace.dependencies]
2727
tokio = { version = "1.35", features = ["full"] }
28-
tokio-util = {version = "0.7.10",features = ["full"]}
29-
tokio-stream = {version = "0.1.14",features = ["full"]}
28+
tokio-util = { version = "0.7.10", features = ["full"] }
29+
tokio-stream = { version = "0.1.14", features = ["full"] }
3030

3131
log = "0.4.0"
3232
env_logger = "0.11.2"
@@ -53,4 +53,6 @@ config = "0.14"
5353

5454
parking_lot = "0.12"
5555
dirs = "5.0"
56-
trait-variant = "0.1.2"
56+
trait-variant = "0.1.2"
57+
58+
once_cell = "1.19.0"

rocketmq-broker/src/broker_runtime.rs

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ impl BrokerRuntime {
174174
if let Some(message_store) = &mut self.message_store {
175175
message_store.shutdown()
176176
}
177+
177178
if let Some(runtime) = self.broker_runtime.take() {
178179
runtime.shutdown();
179180
}

rocketmq-broker/src/processor/send_message_processor.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ impl<MS: MessageStore + Send + Clone> SendMessageProcessor<MS> {
291291
message_ext.tags_code = MessageExtBrokerInner::tags_string2tags_code(
292292
&topic_config.topic_filter_type,
293293
message_ext.get_tags().unwrap_or("".to_string()).as_str(),
294-
) as i64;
294+
);
295295

296296
message_ext.message_ext_inner.born_timestamp = request_header.born_timestamp;
297297
message_ext.message_ext_inner.born_host = ctx.remoting_address();

rocketmq-common/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,5 @@ local-ip-address = "0.6.1"
4949
chrono = "0.4.38"
5050
log = "0.4.20"
5151

52-
parking_lot = { workspace = true }
52+
parking_lot = { workspace = true }
53+
once_cell = { workspace = true }

rocketmq-common/src/common.rs

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod consumer;
3333
mod faq;
3434
pub mod filter;
3535
pub mod future;
36+
pub mod hasher;
3637
pub mod macros;
3738
pub mod message;
3839
pub mod mix_all;

rocketmq-common/src/common/hasher.rs

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
pub mod string_hasher;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::hash::Hasher;
18+
19+
//Compatible with Java String's hash code
20+
pub struct JavaStringHasher {
21+
state: i32,
22+
}
23+
24+
impl Default for JavaStringHasher {
25+
fn default() -> Self {
26+
Self::new()
27+
}
28+
}
29+
30+
impl JavaStringHasher {
31+
pub fn new() -> Self {
32+
JavaStringHasher { state: 0 }
33+
}
34+
35+
pub fn hash_str(&mut self, s: &str) -> i32 {
36+
if self.state == 0 && !s.is_empty() {
37+
for c in s.chars() {
38+
self.state = self.state.wrapping_mul(31).wrapping_add(c as i32);
39+
}
40+
}
41+
self.state
42+
}
43+
}
44+
45+
impl Hasher for JavaStringHasher {
46+
fn finish(&self) -> u64 {
47+
self.state as u64
48+
}
49+
50+
fn write(&mut self, bytes: &[u8]) {
51+
for &byte in bytes {
52+
self.state = self.state.wrapping_mul(31).wrapping_add(byte as i32);
53+
}
54+
}
55+
}
56+
57+
#[cfg(test)]
58+
mod tests {
59+
use super::*;
60+
#[test]
61+
fn test_java_string_hasher() {
62+
let mut hasher = JavaStringHasher::new();
63+
let i = hasher.hash_str("hello world");
64+
assert_eq!(i, 1794106052);
65+
}
66+
}

rocketmq-common/src/common/message/message_single.rs

+4-7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use bytes::{Buf, BufMut};
2525

2626
use crate::{
2727
common::{
28+
hasher::string_hasher::JavaStringHasher,
2829
message::{MessageConst, MessageTrait, MessageVersion},
2930
sys_flag::message_sys_flag::MessageSysFlag,
3031
TopicFilterType,
@@ -414,13 +415,11 @@ impl MessageExtBrokerInner {
414415
self.message_ext_inner.queue_offset()
415416
}
416417

417-
pub fn tags_string2tags_code(_filter: &TopicFilterType, tags: &str) -> u64 {
418+
pub fn tags_string2tags_code(_filter: &TopicFilterType, tags: &str) -> i64 {
418419
if tags.is_empty() {
419420
return 0;
420421
}
421-
let mut hasher = DefaultHasher::new();
422-
tags.hash(&mut hasher);
423-
hasher.finish()
422+
JavaStringHasher::new().hash_str(tags) as i64
424423
}
425424

426425
pub fn get_tags(&self) -> Option<String> {
@@ -444,7 +443,5 @@ pub fn tags_string2tags_code(tags: Option<&String>) -> i64 {
444443
if tags.is_empty() {
445444
return 0;
446445
}
447-
let mut hasher = DefaultHasher::new();
448-
tags.hash(&mut hasher);
449-
hasher.finish() as i64
446+
JavaStringHasher::new().hash_str(tags.as_str()) as i64
450447
}

rocketmq-common/src/common/mix_all.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18+
use std::env;
19+
20+
use once_cell::sync::Lazy;
21+
1822
pub const ROCKETMQ_HOME_ENV: &str = "ROCKETMQ_HOME";
1923
pub const ROCKETMQ_HOME_PROPERTY: &str = "rocketmq.home.dir";
2024
pub const NAMESRV_ADDR_ENV: &str = "NAMESRV_ADDR";
@@ -67,7 +71,8 @@ pub const ZONE_MODE: &str = "__ZONE_MODE";
6771
pub const LOGICAL_QUEUE_MOCK_BROKER_PREFIX: &str = "__syslo__";
6872
pub const METADATA_SCOPE_GLOBAL: &str = "__global__";
6973
pub const LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST: &str = "__syslo__none__";
70-
pub const MULTI_PATH_SPLITTER: &str = "rocketmq.broker.multiPathSplitter";
74+
pub static MULTI_PATH_SPLITTER: Lazy<String> =
75+
Lazy::new(|| env::var("rocketmq.broker.multiPathSplitter").unwrap_or_else(|_| ",".to_string()));
7176

7277
pub fn is_sys_consumer_group(consumer_group: &str) -> bool {
7378
consumer_group.starts_with(CID_RMQ_SYS_PREFIX)

rocketmq-common/src/utils/util_all.rs

+28-9
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,16 @@
1515
* limitations under the License.
1616
*/
1717

18-
use std::path::PathBuf;
18+
use std::{
19+
env, fs,
20+
path::{Path, PathBuf},
21+
};
1922

2023
use chrono::{DateTime, Datelike, TimeZone, Timelike, Utc};
24+
use once_cell::sync::Lazy;
25+
use tracing::info;
26+
27+
use crate::common::mix_all::MULTI_PATH_SPLITTER;
2128

2229
const HEX_ARRAY: [char; 16] = [
2330
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F',
@@ -95,16 +102,28 @@ pub fn offset_to_file_name(offset: u64) -> String {
95102
format!("{:020}", offset)
96103
}
97104

98-
/*pub fn ensure_dir_ok(dir: impl AsRef<PathBuf>) -> Result<(), std::io::Error> {
99-
match dir.as_ref().exists() {
100-
true => Ok(()),
101-
false => Err(std::io::Error::new(
102-
std::io::ErrorKind::NotFound,
103-
format!("{:?}", dir.as_ref()),
104-
)),
105+
pub fn ensure_dir_ok(dir_name: &str) {
106+
if !dir_name.is_empty() {
107+
let multi_path_splitter = MULTI_PATH_SPLITTER.as_str();
108+
if dir_name.contains(multi_path_splitter) {
109+
for dir in dir_name.trim().split(&multi_path_splitter) {
110+
create_dir_if_not_exist(dir);
111+
}
112+
} else {
113+
create_dir_if_not_exist(dir_name);
114+
}
105115
}
106-
}*/
116+
}
107117

118+
fn create_dir_if_not_exist(dir_name: &str) {
119+
let path = Path::new(dir_name);
120+
if !path.exists() {
121+
match fs::create_dir_all(path) {
122+
Ok(_) => info!("{} mkdir OK", dir_name),
123+
Err(_) => info!("{} mkdir Failed", dir_name),
124+
}
125+
}
126+
}
108127
#[cfg(test)]
109128
mod tests {
110129
use super::*;

rocketmq-store/src/base/append_message_callback.rs

+14-17
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
use std::{collections::HashMap, sync::Arc};
1818

19-
use bytes::{Bytes, BytesMut};
19+
use bytes::{BufMut, Bytes, BytesMut};
2020
use rocketmq_common::{
2121
common::{
2222
attribute::cq_type::CQType,
@@ -36,7 +36,7 @@ use crate::{
3636
},
3737
config::message_store_config::MessageStoreConfig,
3838
log_file::{
39-
commit_log::{CommitLog, CRC32_RESERVED_LEN},
39+
commit_log::{CommitLog, BLANK_MAGIC_CODE, CRC32_RESERVED_LEN},
4040
mapped_file::MappedFile,
4141
},
4242
};
@@ -101,7 +101,7 @@ pub trait AppendMessageCallback {
101101
const END_FILE_MIN_BLANK_LENGTH: i32 = 4 + 4;
102102

103103
pub(crate) struct DefaultAppendMessageCallback {
104-
msg_store_item_memory: bytes::BytesMut,
104+
//msg_store_item_memory: bytes::BytesMut,
105105
crc32_reserved_length: i32,
106106
message_store_config: Arc<MessageStoreConfig>,
107107
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
@@ -113,9 +113,9 @@ impl DefaultAppendMessageCallback {
113113
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
114114
) -> Self {
115115
Self {
116-
msg_store_item_memory: bytes::BytesMut::with_capacity(
116+
/* msg_store_item_memory: bytes::BytesMut::with_capacity(
117117
END_FILE_MIN_BLANK_LENGTH as usize,
118-
),
118+
),*/
119119
crc32_reserved_length: CRC32_RESERVED_LEN,
120120
message_store_config,
121121
topic_config_table,
@@ -178,42 +178,38 @@ impl AppendMessageCallback for DefaultAppendMessageCallback {
178178
match MessageSysFlag::get_transaction_value(msg_inner.sys_flag()) {
179179
MessageSysFlag::TRANSACTION_PREPARED_TYPE
180180
| MessageSysFlag::TRANSACTION_ROLLBACK_TYPE => queue_offset = 0,
181-
// MessageSysFlag::TRANSACTION_NOT_TYPE | MessageSysFlag::TRANSACTION_COMMIT_TYPE | _ =>
182-
// {}
183181
_ => {}
184182
}
185183

186184
if (msg_len + END_FILE_MIN_BLANK_LENGTH) > max_blank {
187-
/*self.msg_store_item_memory.borrow_mut().clear();
188-
self.msg_store_item_memory.borrow_mut().put_i32(max_blank);
189-
self.msg_store_item_memory
190-
.borrow_mut()
191-
.put_i32(BLANK_MAGIC_CODE);
192-
let bytes = self.msg_store_item_memory.borrow_mut().split().freeze();
193-
mapped_file.append_message_bytes(&bytes);*/
185+
let mut bytes = BytesMut::with_capacity(END_FILE_MIN_BLANK_LENGTH as usize);
186+
bytes.put_i32(max_blank);
187+
bytes.put_i32(BLANK_MAGIC_CODE);
188+
mapped_file.append_message_bytes(&bytes.freeze());
194189
return AppendMessageResult {
195190
status: AppendMessageStatus::EndOfFile,
196191
wrote_offset,
197192
wrote_bytes: max_blank,
198193
msg_id,
199194
store_timestamp: msg_inner.store_timestamp(),
200195
logics_offset: queue_offset,
196+
msg_num: message_num as i32,
201197
..Default::default()
202198
};
203199
}
204200

205201
let mut pos = 4 + 4 + 4 + 4 + 4;
206-
pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&queue_offset.to_le_bytes());
202+
pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&queue_offset.to_be_bytes());
207203
pos += 8;
208-
pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&wrote_offset.to_le_bytes());
204+
pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&wrote_offset.to_be_bytes());
209205
let ip_len = if msg_inner.sys_flag() & MessageSysFlag::BORNHOST_V6_FLAG == 0 {
210206
4 + 4
211207
} else {
212208
16 + 4
213209
};
214210
pos += 8 + 4 + 8 + ip_len;
215211
pre_encode_buffer[pos..(pos + 8)]
216-
.copy_from_slice(&msg_inner.store_timestamp().to_le_bytes());
212+
.copy_from_slice(&msg_inner.store_timestamp().to_be_bytes());
217213

218214
// msg_inner.encoded_buff = pre_encode_buffer;
219215
let bytes = Bytes::from(pre_encode_buffer);
@@ -225,6 +221,7 @@ impl AppendMessageCallback for DefaultAppendMessageCallback {
225221
msg_id,
226222
store_timestamp: msg_inner.store_timestamp(),
227223
logics_offset: queue_offset,
224+
msg_num: message_num as i32,
228225
..Default::default()
229226
}
230227
}

rocketmq-store/src/consume_queue/mapped_file_queue.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,11 @@ impl MappedFileQueue {
294294
>= last_mapped_file.as_ref().unwrap().get_file_from_offset() as i64
295295
+ self.mapped_file_size as i64
296296
{
297-
None
297+
if return_first_on_not_found {
298+
first_mapped_file
299+
} else {
300+
None
301+
}
298302
} else {
299303
let index = offset as usize / self.mapped_file_size as usize
300304
- first_mapped_file.as_ref().unwrap().get_file_from_offset() as usize

0 commit comments

Comments
 (0)