Skip to content

Commit cd94059

Browse files
committed
[ISSUE #312]✨Support send message and send message v2(request code:10,310)-5
1 parent 2f5749c commit cd94059

File tree

7 files changed

+29
-30
lines changed

7 files changed

+29
-30
lines changed

rocketmq-broker/src/broker_runtime.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,9 @@ impl BrokerRuntime {
165165
fn initialize_message_store(&mut self) -> bool {
166166
if self.message_store_config.store_type == StoreType::LocalFile {
167167
info!("Use local file as message store");
168-
self.message_store = Some(Arc::new(LocalFileMessageStore::new(self.message_store_config.clone())));
168+
self.message_store = Some(Arc::new(LocalFileMessageStore::new(
169+
self.message_store_config.clone(),
170+
)));
169171
} else if self.message_store_config.store_type == StoreType::RocksDB {
170172
info!("Use RocksDB as message store");
171173
} else {

rocketmq-broker/src/processor/send_message_processor.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,6 @@ impl<MS: MessageStore + Send> SendMessageProcessor<MS> {
215215
);
216216
}
217217

218-
let mut response_header = SendMessageResponseHeader::default();
219218
let mut topic_config = self
220219
.topic_config_manager
221220
.select_topic_config(request_header.topic().as_str())
@@ -320,7 +319,6 @@ impl<MS: MessageStore + Send> SendMessageProcessor<MS> {
320319
response,
321320
&request,
322321
topic.as_str(),
323-
&mut response_header,
324322
queue_id,
325323
)
326324
}
@@ -331,14 +329,14 @@ impl<MS: MessageStore + Send> SendMessageProcessor<MS> {
331329
response: RemotingCommand,
332330
request: &RemotingCommand,
333331
topic: &str,
334-
response_header: &mut SendMessageResponseHeader,
335332
// send_message_context: &mut SendMessageContext,
336333
// ctx: ConnectionHandlerContext,
337334
queue_id_int: i32,
338335
//begin_time_millis: i64,
339336
// mapping_context: TopicQueueMappingContext,
340337
//message_type: TopicMessageType,
341338
) -> Option<RemotingCommand> {
339+
let mut response_header = SendMessageResponseHeader::default();
342340
let mut response = response;
343341
let mut send_ok = false;
344342
match put_message_result.put_message_status() {
@@ -421,6 +419,7 @@ impl<MS: MessageStore + Send> SendMessageProcessor<MS> {
421419
.logics_offset,
422420
);
423421
//response_header.set_transaction_id();
422+
response = response.set_command_custom_header(Some(Box::new(response_header)));
424423
Some(response)
425424
} else {
426425
unimplemented!()

rocketmq-store/src/base/message_result.rs

+1-10
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub struct AppendMessageResult {
4343
pub msg_num: i32,
4444
}
4545

46+
#[derive(Default)]
4647
pub struct PutMessageResult {
4748
put_message_status: PutMessageStatus,
4849
append_message_result: Option<AppendMessageResult>,
@@ -64,16 +65,6 @@ impl Default for AppendMessageResult {
6465
}
6566
}
6667

67-
impl Default for PutMessageResult {
68-
fn default() -> Self {
69-
Self{
70-
put_message_status: Default::default(),
71-
append_message_result: None,
72-
remote_put: false,
73-
}
74-
}
75-
}
76-
7768
impl PutMessageResult {
7869
pub fn new(
7970
put_message_status: PutMessageStatus,

rocketmq-store/src/config/message_store_config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -400,4 +400,4 @@ impl MessageStoreConfig {
400400
}
401401
self.store_path_commit_log.clone().unwrap()
402402
}
403-
}
403+
}

rocketmq-store/src/consume_queue/mapped_file_queue.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
use std::{
18+
use std::{
1919
fs,
2020
path::{Path, PathBuf},
2121
sync::Arc,
@@ -139,9 +139,7 @@ impl MappedFileQueue {
139139
if self.mapped_files.is_empty() {
140140
return None;
141141
}
142-
self.mapped_files
143-
.last()
144-
.map_or(None, |last| Some(last.clone()))
142+
self.mapped_files.last().cloned()
145143
}
146144

147145
pub fn get_last_mapped_file_mut_start_offset(
@@ -157,7 +155,9 @@ impl MappedFileQueue {
157155
create_offset = start_offset as i64 - (start_offset as i64 % file_size);
158156
}
159157
Some(ref value) => {
160-
create_offset = value.lock().get_file_from_offset() as i64 + file_size;
158+
if value.lock().is_full() {
159+
create_offset = value.lock().get_file_from_offset() as i64 + file_size
160+
}
161161
}
162162
}
163163
if create_offset != -1 && need_create {

rocketmq-store/src/log_file/commit_log.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use std::{cell::Cell, ops::Deref, sync::Arc};
1919

2020
use rocketmq_common::{
2121
common::{
22-
attribute::cq_type::CQType, message::{message_single::MessageExtBrokerInner, MessageConst, MessageVersion}, mix_all
22+
attribute::cq_type::CQType,
23+
message::{message_single::MessageExtBrokerInner, MessageConst, MessageVersion},
24+
mix_all,
2325
},
2426
utils::time_utils,
2527
CRC32Utils::crc32,
@@ -150,8 +152,11 @@ impl CommitLog {
150152
append_message_callback.do_append(mapped_file.file_from_offset() as i64, 0, &mut msg);
151153
mapped_file.append_data(msg.encoded_buff.clone(), false);*/
152154

153-
let result =
154-
mapped_file.lock().append_message(msg, append_message_callback, &mut put_message_context);
155+
let result = mapped_file.lock().append_message(
156+
msg,
157+
append_message_callback,
158+
&mut put_message_context,
159+
);
155160

156161
match result.status {
157162
AppendMessageStatus::PutOk => {
@@ -179,13 +184,14 @@ impl CommitLog {
179184
.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX)
180185
}
181186

182-
pub fn get_message_num(&self,_msg_inner: &MessageExtBrokerInner) -> i16 {
183-
let mut message_num = 1i16;
187+
pub fn get_message_num(&self, _msg_inner: &MessageExtBrokerInner) -> i16 {
188+
// let mut message_num = 1i16;
184189

185-
message_num
190+
// message_num
191+
1
186192
}
187193

188-
fn get_cq_type(&self,_msg_inner:MessageExtBrokerInner) -> CQType{
194+
fn get_cq_type(&self, _msg_inner: MessageExtBrokerInner) -> CQType {
189195
CQType::SimpleCQ
190196
}
191197
}

rocketmq-store/src/log_file/mapped_file/default_impl_refactor.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ use rocketmq_common::common::message::message_single::MessageExtBrokerInner;
2828
use tracing::error;
2929

3030
use crate::base::{
31-
append_message_callback::AppendMessageCallback, message_result::AppendMessageResult, message_status_enum::AppendMessageStatus, put_message_context::PutMessageContext
31+
append_message_callback::AppendMessageCallback, message_result::AppendMessageResult,
32+
message_status_enum::AppendMessageStatus, put_message_context::PutMessageContext,
3233
};
3334

34-
3535
pub struct LocalMappedFile {
3636
//file information
3737
file_name: String,
@@ -55,7 +55,8 @@ impl LocalMappedFile {
5555
let file = OpenOptions::new()
5656
.read(true)
5757
.write(true)
58-
//.create(true)
58+
.create(true)
59+
.truncate(false)
5960
.open(&path_buf)
6061
.unwrap();
6162
file.set_len(file_size).unwrap();

0 commit comments

Comments
 (0)