Skip to content

Commit b9fe722

Browse files
authored
[ISSUE #350]📌Optimize AppendMessageCallback trait and implemention (#351)
1 parent 1491f69 commit b9fe722

File tree

4 files changed

+68
-27
lines changed

4 files changed

+68
-27
lines changed

‎rocketmq-store/src/base/append_message_callback.rs

+50-14
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,19 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use std::sync::Arc;
17+
use std::{collections::HashMap, sync::Arc};
1818

1919
use bytes::BytesMut;
2020
use rocketmq_common::{
2121
common::{
22-
message::{message_batch::MessageExtBatch, message_single::MessageExtBrokerInner},
22+
attribute::cq_type::CQType,
23+
config::TopicConfig,
24+
message::{
25+
message_batch::MessageExtBatch, message_single::MessageExtBrokerInner, MessageConst,
26+
},
2327
sys_flag::message_sys_flag::MessageSysFlag,
2428
},
25-
utils::message_utils,
29+
utils::{message_utils, queue_type_utils::QueueTypeUtils},
2630
};
2731

2832
use crate::{
@@ -31,7 +35,10 @@ use crate::{
3135
put_message_context::PutMessageContext,
3236
},
3337
config::message_store_config::MessageStoreConfig,
34-
log_file::commit_log::{CommitLog, CRC32_RESERVED_LEN},
38+
log_file::{
39+
commit_log::{CommitLog, CRC32_RESERVED_LEN},
40+
mapped_file::default_impl_refactor::LocalMappedFile,
41+
},
3542
};
3643

3744
/// Write messages callback interface
@@ -50,9 +57,9 @@ pub trait AppendMessageCallback {
5057
///
5158
/// The number of bytes written
5259
fn do_append(
53-
&mut self,
60+
&self,
5461
file_from_offset: i64,
55-
file_wrote_position: i64,
62+
mapped_file: &LocalMappedFile,
5663
max_blank: i32,
5764
msg: &mut MessageExtBrokerInner,
5865
put_message_context: &PutMessageContext,
@@ -85,29 +92,58 @@ pub trait AppendMessageCallback {
8592
const END_FILE_MIN_BLANK_LENGTH: i32 = 4 + 4;
8693

8794
pub(crate) struct DefaultAppendMessageCallback {
88-
pub msg_store_item_memory: bytes::BytesMut,
89-
pub crc32_reserved_length: i32,
90-
pub message_store_config: Arc<MessageStoreConfig>,
95+
msg_store_item_memory: bytes::BytesMut,
96+
crc32_reserved_length: i32,
97+
message_store_config: Arc<MessageStoreConfig>,
98+
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
9199
}
92100

93101
impl DefaultAppendMessageCallback {
94-
pub fn new(message_store_config: Arc<MessageStoreConfig>) -> Self {
102+
pub fn new(
103+
message_store_config: Arc<MessageStoreConfig>,
104+
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
105+
) -> Self {
95106
Self {
96107
msg_store_item_memory: bytes::BytesMut::with_capacity(
97108
END_FILE_MIN_BLANK_LENGTH as usize,
98109
),
99110
crc32_reserved_length: CRC32_RESERVED_LEN,
100111
message_store_config,
112+
topic_config_table,
101113
}
102114
}
103115
}
104116

117+
impl DefaultAppendMessageCallback {
118+
fn get_message_num(&self, msg_inner: &MessageExtBrokerInner) -> i16 {
119+
let mut message_num = 1i16;
120+
let cq_type = self.get_cq_type(msg_inner);
121+
if MessageSysFlag::check(msg_inner.sys_flag(), MessageSysFlag::INNER_BATCH_FLAG)
122+
|| CQType::BatchCQ == cq_type
123+
{
124+
if let Some(key) = msg_inner.property(MessageConst::PROPERTY_INNER_NUM) {
125+
message_num = key.parse::<i16>().unwrap_or(1);
126+
}
127+
}
128+
message_num
129+
}
130+
131+
fn get_cq_type(&self, msg_inner: &MessageExtBrokerInner) -> CQType {
132+
let topic_config = self
133+
.topic_config_table
134+
.lock()
135+
.get(msg_inner.message_ext_inner.topic())
136+
.cloned();
137+
QueueTypeUtils::get_cq_type(&topic_config)
138+
}
139+
}
140+
105141
#[allow(unused_variables)]
106142
impl AppendMessageCallback for DefaultAppendMessageCallback {
107143
fn do_append(
108-
&mut self,
144+
&self,
109145
file_from_offset: i64,
110-
file_wrote_position: i64,
146+
mapped_file: &LocalMappedFile,
111147
max_blank: i32,
112148
msg_inner: &mut MessageExtBrokerInner,
113149
put_message_context: &PutMessageContext,
@@ -122,14 +158,14 @@ impl AppendMessageCallback for DefaultAppendMessageCallback {
122158
}
123159

124160
let msg_len = i32::from_le_bytes(pre_encode_buffer[0..4].try_into().unwrap());
125-
let wrote_offset = file_from_offset + file_wrote_position;
161+
let wrote_offset = file_from_offset + mapped_file.wrote_position() as i64;
126162

127163
let msg_id =
128164
message_utils::build_message_id(msg_inner.message_ext_inner.store_host, wrote_offset);
129165

130166
let mut queue_offset = msg_inner.queue_offset();
131167
//let message_num = CommitLog::get_message_num(msg_inner);
132-
let message_num = 1;
168+
let message_num = self.get_message_num(msg_inner);
133169
match MessageSysFlag::get_transaction_value(msg_inner.sys_flag()) {
134170
MessageSysFlag::TRANSACTION_PREPARED_TYPE
135171
| MessageSysFlag::TRANSACTION_ROLLBACK_TYPE => queue_offset = 0,

‎rocketmq-store/src/log_file/commit_log.rs

+14-9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use rocketmq_common::{
2222
common::{
2323
attribute::cq_type::CQType,
2424
broker::broker_config::BrokerConfig,
25+
config::TopicConfig,
2526
message::{
2627
message_single::{tags_string2tags_code, MessageExtBrokerInner},
2728
MessageConst, MessageVersion,
@@ -86,6 +87,7 @@ pub struct CommitLog {
8687
dispatcher: CommitLogDispatcherDefault,
8788
confirm_offset: i64,
8889
store_checkpoint: StoreCheckpoint,
90+
append_message_callback: Arc<DefaultAppendMessageCallback>,
8991
}
9092

9193
impl CommitLog {
@@ -94,19 +96,24 @@ impl CommitLog {
9496
broker_config: Arc<BrokerConfig>,
9597
dispatcher: &CommitLogDispatcherDefault,
9698
store_checkpoint: StoreCheckpoint,
99+
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
97100
) -> Self {
98101
let enabled_append_prop_crc = message_store_config.enabled_append_prop_crc;
99102
let store_path = message_store_config.get_store_path_commit_log();
100103
let mapped_file_size = message_store_config.mapped_file_size_commit_log;
101104
Self {
102105
mapped_file_queue: MappedFileQueue::new(store_path, mapped_file_size as u64, None),
103-
message_store_config,
106+
message_store_config: message_store_config.clone(),
104107
broker_config,
105108
enabled_append_prop_crc,
106109
//local_file_message_store: None,
107110
dispatcher: dispatcher.clone(),
108111
confirm_offset: -1,
109112
store_checkpoint,
113+
append_message_callback: Arc::new(DefaultAppendMessageCallback::new(
114+
message_store_config,
115+
topic_config_table,
116+
)),
110117
}
111118
}
112119
}
@@ -189,14 +196,12 @@ impl CommitLog {
189196
};
190197
let topic_queue_key = generate_key(&msg);
191198
let mut put_message_context = PutMessageContext::new(topic_queue_key);
192-
let append_message_callback =
193-
DefaultAppendMessageCallback::new(self.message_store_config.clone());
194-
/* let result =
195-
append_message_callback.do_append(mapped_file.file_from_offset() as i64, 0, &mut msg);
196-
mapped_file.append_data(msg.encoded_buff.clone(), false);*/
197-
198-
let result =
199-
mapped_file.append_message(msg, append_message_callback, &mut put_message_context);
199+
200+
let result = mapped_file.append_message(
201+
msg,
202+
self.append_message_callback.as_ref(),
203+
&mut put_message_context,
204+
);
200205

201206
match result.status {
202207
AppendMessageStatus::PutOk => {

‎rocketmq-store/src/log_file/mapped_file/default_impl_refactor.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,10 @@ impl LocalMappedFile {
159159
write_success
160160
}
161161

162-
pub fn append_message(
162+
pub fn append_message<AMC: AppendMessageCallback>(
163163
&self,
164164
message: MessageExtBrokerInner,
165-
message_callback: impl AppendMessageCallback,
165+
message_callback: &AMC,
166166
put_message_context: &mut PutMessageContext,
167167
) -> AppendMessageResult {
168168
let mut message = message;
@@ -177,10 +177,9 @@ impl LocalMappedFile {
177177
..Default::default()
178178
};
179179
}
180-
let mut message_callback = message_callback;
181180
let append_message_result = message_callback.do_append(
182181
self.file_from_offset() as i64,
183-
current_pos as i64,
182+
self,
184183
(self.file_size - current_pos) as i32,
185184
&mut message,
186185
put_message_context,

‎rocketmq-store/src/message_store/default_message_store.rs

+1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ impl DefaultMessageStore {
123123
broker_config.clone(),
124124
&dispatcher,
125125
store_checkpoint.clone(),
126+
topic_config_table.clone(),
126127
);
127128
Self {
128129
message_store_config: message_store_config.clone(),

0 commit comments

Comments
 (0)