From a21ca0afc6f7fe7d89ec28849e11ee0bc9a90442 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 19 May 2024 22:31:46 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#368]=F0=9F=9A=80Optimize=20send=20mes?= =?UTF-8?q?sage=20logic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 8 ++- rocketmq-broker/src/broker_runtime.rs | 1 + .../src/processor/send_message_processor.rs | 2 +- rocketmq-common/Cargo.toml | 3 +- rocketmq-common/src/common.rs | 1 + rocketmq-common/src/common/hasher.rs | 17 +++++ .../src/common/hasher/string_hasher.rs | 66 +++++++++++++++++++ .../src/common/message/message_single.rs | 11 ++-- rocketmq-common/src/common/mix_all.rs | 7 +- rocketmq-common/src/utils/util_all.rs | 37 ++++++++--- .../src/base/append_message_callback.rs | 31 ++++----- .../src/consume_queue/mapped_file_queue.rs | 6 +- .../src/log_file/mapped_file/default_impl.rs | 17 +++-- .../message_store/default_message_store.rs | 59 ++++++++++------- .../queue/local_file_consume_queue_store.rs | 2 +- .../src/queue/single_consume_queue.rs | 10 +-- 16 files changed, 204 insertions(+), 74 deletions(-) create mode 100644 rocketmq-common/src/common/hasher.rs create mode 100644 rocketmq-common/src/common/hasher/string_hasher.rs diff --git a/Cargo.toml b/Cargo.toml index b5bb78de..3623471e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,8 @@ Unofficial Rust implementation of Apache RocketMQ """ [workspace.dependencies] tokio = { version = "1.35", features = ["full"] } -tokio-util = {version = "0.7.10",features = ["full"]} -tokio-stream = {version = "0.1.14",features = ["full"]} +tokio-util = { version = "0.7.10", features = ["full"] } +tokio-stream = { version = "0.1.14", features = ["full"] } log = "0.4.0" env_logger = "0.11.2" @@ -53,4 +53,6 @@ config = "0.14" parking_lot = "0.12" dirs = "5.0" -trait-variant = "0.1.2" \ No newline at end of file +trait-variant = "0.1.2" + +once_cell = "1.19.0" \ No newline at end of file diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index fdec1877..81eb16a4 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -174,6 +174,7 @@ impl BrokerRuntime { if let Some(message_store) = &mut self.message_store { message_store.shutdown() } + if let Some(runtime) = self.broker_runtime.take() { runtime.shutdown(); } diff --git a/rocketmq-broker/src/processor/send_message_processor.rs b/rocketmq-broker/src/processor/send_message_processor.rs index 8ff357bc..82dacce0 100644 --- a/rocketmq-broker/src/processor/send_message_processor.rs +++ b/rocketmq-broker/src/processor/send_message_processor.rs @@ -291,7 +291,7 @@ impl SendMessageProcessor { message_ext.tags_code = MessageExtBrokerInner::tags_string2tags_code( &topic_config.topic_filter_type, message_ext.get_tags().unwrap_or("".to_string()).as_str(), - ) as i64; + ); message_ext.message_ext_inner.born_timestamp = request_header.born_timestamp; message_ext.message_ext_inner.born_host = ctx.remoting_address(); diff --git a/rocketmq-common/Cargo.toml b/rocketmq-common/Cargo.toml index ea4d5ddd..7e2b219f 100644 --- a/rocketmq-common/Cargo.toml +++ b/rocketmq-common/Cargo.toml @@ -49,4 +49,5 @@ local-ip-address = "0.6.1" chrono = "0.4.38" log = "0.4.20" -parking_lot = { workspace = true } \ No newline at end of file +parking_lot = { workspace = true } +once_cell = { workspace = true } \ No newline at end of file diff --git a/rocketmq-common/src/common.rs b/rocketmq-common/src/common.rs index c699830c..f9b6ef62 100644 --- a/rocketmq-common/src/common.rs +++ b/rocketmq-common/src/common.rs @@ -33,6 +33,7 @@ pub mod consumer; mod faq; pub mod filter; pub mod future; +pub mod hasher; pub mod macros; pub mod message; pub mod mix_all; diff --git a/rocketmq-common/src/common/hasher.rs b/rocketmq-common/src/common/hasher.rs new file mode 100644 index 00000000..d2e80a2a --- /dev/null +++ b/rocketmq-common/src/common/hasher.rs @@ -0,0 +1,17 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +pub mod string_hasher; diff --git a/rocketmq-common/src/common/hasher/string_hasher.rs b/rocketmq-common/src/common/hasher/string_hasher.rs new file mode 100644 index 00000000..2ced624e --- /dev/null +++ b/rocketmq-common/src/common/hasher/string_hasher.rs @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::hash::Hasher; + +//Compatible with Java String's hash code +pub struct JavaStringHasher { + state: i32, +} + +impl Default for JavaStringHasher { + fn default() -> Self { + Self::new() + } +} + +impl JavaStringHasher { + pub fn new() -> Self { + JavaStringHasher { state: 0 } + } + + pub fn hash_str(&mut self, s: &str) -> i32 { + if self.state == 0 && !s.is_empty() { + for c in s.chars() { + self.state = self.state.wrapping_mul(31).wrapping_add(c as i32); + } + } + self.state + } +} + +impl Hasher for JavaStringHasher { + fn finish(&self) -> u64 { + self.state as u64 + } + + fn write(&mut self, bytes: &[u8]) { + for &byte in bytes { + self.state = self.state.wrapping_mul(31).wrapping_add(byte as i32); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_java_string_hasher() { + let mut hasher = JavaStringHasher::new(); + let i = hasher.hash_str("hello world"); + assert_eq!(i, 1794106052); + } +} diff --git a/rocketmq-common/src/common/message/message_single.rs b/rocketmq-common/src/common/message/message_single.rs index 06d655a6..7d7435f1 100644 --- a/rocketmq-common/src/common/message/message_single.rs +++ b/rocketmq-common/src/common/message/message_single.rs @@ -25,6 +25,7 @@ use bytes::{Buf, BufMut}; use crate::{ common::{ + hasher::string_hasher::JavaStringHasher, message::{MessageConst, MessageTrait, MessageVersion}, sys_flag::message_sys_flag::MessageSysFlag, TopicFilterType, @@ -414,13 +415,11 @@ impl MessageExtBrokerInner { self.message_ext_inner.queue_offset() } - pub fn tags_string2tags_code(_filter: &TopicFilterType, tags: &str) -> u64 { + pub fn tags_string2tags_code(_filter: &TopicFilterType, tags: &str) -> i64 { if tags.is_empty() { return 0; } - let mut hasher = DefaultHasher::new(); - tags.hash(&mut hasher); - hasher.finish() + JavaStringHasher::new().hash_str(tags) as i64 } pub fn get_tags(&self) -> Option { @@ -444,7 +443,5 @@ pub fn tags_string2tags_code(tags: Option<&String>) -> i64 { if tags.is_empty() { return 0; } - let mut hasher = DefaultHasher::new(); - tags.hash(&mut hasher); - hasher.finish() as i64 + JavaStringHasher::new().hash_str(tags.as_str()) as i64 } diff --git a/rocketmq-common/src/common/mix_all.rs b/rocketmq-common/src/common/mix_all.rs index 33a48b38..42cf04d0 100644 --- a/rocketmq-common/src/common/mix_all.rs +++ b/rocketmq-common/src/common/mix_all.rs @@ -15,6 +15,10 @@ * limitations under the License. */ +use std::env; + +use once_cell::sync::Lazy; + pub const ROCKETMQ_HOME_ENV: &str = "ROCKETMQ_HOME"; pub const ROCKETMQ_HOME_PROPERTY: &str = "rocketmq.home.dir"; pub const NAMESRV_ADDR_ENV: &str = "NAMESRV_ADDR"; @@ -67,7 +71,8 @@ pub const ZONE_MODE: &str = "__ZONE_MODE"; pub const LOGICAL_QUEUE_MOCK_BROKER_PREFIX: &str = "__syslo__"; pub const METADATA_SCOPE_GLOBAL: &str = "__global__"; pub const LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST: &str = "__syslo__none__"; -pub const MULTI_PATH_SPLITTER: &str = "rocketmq.broker.multiPathSplitter"; +pub static MULTI_PATH_SPLITTER: Lazy = + Lazy::new(|| env::var("rocketmq.broker.multiPathSplitter").unwrap_or_else(|_| ",".to_string())); pub fn is_sys_consumer_group(consumer_group: &str) -> bool { consumer_group.starts_with(CID_RMQ_SYS_PREFIX) diff --git a/rocketmq-common/src/utils/util_all.rs b/rocketmq-common/src/utils/util_all.rs index 9da36912..61d6dad9 100644 --- a/rocketmq-common/src/utils/util_all.rs +++ b/rocketmq-common/src/utils/util_all.rs @@ -15,9 +15,16 @@ * limitations under the License. */ -use std::path::PathBuf; +use std::{ + env, fs, + path::{Path, PathBuf}, +}; use chrono::{DateTime, Datelike, TimeZone, Timelike, Utc}; +use once_cell::sync::Lazy; +use tracing::info; + +use crate::common::mix_all::MULTI_PATH_SPLITTER; const HEX_ARRAY: [char; 16] = [ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', @@ -95,16 +102,28 @@ pub fn offset_to_file_name(offset: u64) -> String { format!("{:020}", offset) } -/*pub fn ensure_dir_ok(dir: impl AsRef) -> Result<(), std::io::Error> { - match dir.as_ref().exists() { - true => Ok(()), - false => Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("{:?}", dir.as_ref()), - )), +pub fn ensure_dir_ok(dir_name: &str) { + if !dir_name.is_empty() { + let multi_path_splitter = MULTI_PATH_SPLITTER.as_str(); + if dir_name.contains(multi_path_splitter) { + for dir in dir_name.trim().split(&multi_path_splitter) { + create_dir_if_not_exist(dir); + } + } else { + create_dir_if_not_exist(dir_name); + } } -}*/ +} +fn create_dir_if_not_exist(dir_name: &str) { + let path = Path::new(dir_name); + if !path.exists() { + match fs::create_dir_all(path) { + Ok(_) => info!("{} mkdir OK", dir_name), + Err(_) => info!("{} mkdir Failed", dir_name), + } + } +} #[cfg(test)] mod tests { use super::*; diff --git a/rocketmq-store/src/base/append_message_callback.rs b/rocketmq-store/src/base/append_message_callback.rs index 131dff62..118cfbee 100644 --- a/rocketmq-store/src/base/append_message_callback.rs +++ b/rocketmq-store/src/base/append_message_callback.rs @@ -16,7 +16,7 @@ */ use std::{collections::HashMap, sync::Arc}; -use bytes::{Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use rocketmq_common::{ common::{ attribute::cq_type::CQType, @@ -36,7 +36,7 @@ use crate::{ }, config::message_store_config::MessageStoreConfig, log_file::{ - commit_log::{CommitLog, CRC32_RESERVED_LEN}, + commit_log::{CommitLog, BLANK_MAGIC_CODE, CRC32_RESERVED_LEN}, mapped_file::MappedFile, }, }; @@ -101,7 +101,7 @@ pub trait AppendMessageCallback { const END_FILE_MIN_BLANK_LENGTH: i32 = 4 + 4; pub(crate) struct DefaultAppendMessageCallback { - msg_store_item_memory: bytes::BytesMut, + //msg_store_item_memory: bytes::BytesMut, crc32_reserved_length: i32, message_store_config: Arc, topic_config_table: Arc>>, @@ -113,9 +113,9 @@ impl DefaultAppendMessageCallback { topic_config_table: Arc>>, ) -> Self { Self { - msg_store_item_memory: bytes::BytesMut::with_capacity( + /* msg_store_item_memory: bytes::BytesMut::with_capacity( END_FILE_MIN_BLANK_LENGTH as usize, - ), + ),*/ crc32_reserved_length: CRC32_RESERVED_LEN, message_store_config, topic_config_table, @@ -178,19 +178,14 @@ impl AppendMessageCallback for DefaultAppendMessageCallback { match MessageSysFlag::get_transaction_value(msg_inner.sys_flag()) { MessageSysFlag::TRANSACTION_PREPARED_TYPE | MessageSysFlag::TRANSACTION_ROLLBACK_TYPE => queue_offset = 0, - // MessageSysFlag::TRANSACTION_NOT_TYPE | MessageSysFlag::TRANSACTION_COMMIT_TYPE | _ => - // {} _ => {} } if (msg_len + END_FILE_MIN_BLANK_LENGTH) > max_blank { - /*self.msg_store_item_memory.borrow_mut().clear(); - self.msg_store_item_memory.borrow_mut().put_i32(max_blank); - self.msg_store_item_memory - .borrow_mut() - .put_i32(BLANK_MAGIC_CODE); - let bytes = self.msg_store_item_memory.borrow_mut().split().freeze(); - mapped_file.append_message_bytes(&bytes);*/ + let mut bytes = BytesMut::with_capacity(END_FILE_MIN_BLANK_LENGTH as usize); + bytes.put_i32(max_blank); + bytes.put_i32(BLANK_MAGIC_CODE); + mapped_file.append_message_bytes(&bytes.freeze()); return AppendMessageResult { status: AppendMessageStatus::EndOfFile, wrote_offset, @@ -198,14 +193,15 @@ impl AppendMessageCallback for DefaultAppendMessageCallback { msg_id, store_timestamp: msg_inner.store_timestamp(), logics_offset: queue_offset, + msg_num: message_num as i32, ..Default::default() }; } let mut pos = 4 + 4 + 4 + 4 + 4; - pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&queue_offset.to_le_bytes()); + pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&queue_offset.to_be_bytes()); pos += 8; - pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&wrote_offset.to_le_bytes()); + pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&wrote_offset.to_be_bytes()); let ip_len = if msg_inner.sys_flag() & MessageSysFlag::BORNHOST_V6_FLAG == 0 { 4 + 4 } else { @@ -213,7 +209,7 @@ impl AppendMessageCallback for DefaultAppendMessageCallback { }; pos += 8 + 4 + 8 + ip_len; pre_encode_buffer[pos..(pos + 8)] - .copy_from_slice(&msg_inner.store_timestamp().to_le_bytes()); + .copy_from_slice(&msg_inner.store_timestamp().to_be_bytes()); // msg_inner.encoded_buff = pre_encode_buffer; let bytes = Bytes::from(pre_encode_buffer); @@ -225,6 +221,7 @@ impl AppendMessageCallback for DefaultAppendMessageCallback { msg_id, store_timestamp: msg_inner.store_timestamp(), logics_offset: queue_offset, + msg_num: message_num as i32, ..Default::default() } } diff --git a/rocketmq-store/src/consume_queue/mapped_file_queue.rs b/rocketmq-store/src/consume_queue/mapped_file_queue.rs index 3f4ef5d5..57e8f90a 100644 --- a/rocketmq-store/src/consume_queue/mapped_file_queue.rs +++ b/rocketmq-store/src/consume_queue/mapped_file_queue.rs @@ -294,7 +294,11 @@ impl MappedFileQueue { >= last_mapped_file.as_ref().unwrap().get_file_from_offset() as i64 + self.mapped_file_size as i64 { - None + if return_first_on_not_found { + first_mapped_file + } else { + None + } } else { let index = offset as usize / self.mapped_file_size as usize - first_mapped_file.as_ref().unwrap().get_file_from_offset() as usize diff --git a/rocketmq-store/src/log_file/mapped_file/default_impl.rs b/rocketmq-store/src/log_file/mapped_file/default_impl.rs index 93b9de17..d4086725 100644 --- a/rocketmq-store/src/log_file/mapped_file/default_impl.rs +++ b/rocketmq-store/src/log_file/mapped_file/default_impl.rs @@ -28,8 +28,9 @@ use std::{ use bytes::{Bytes, BytesMut}; use memmap2::MmapMut; -use rocketmq_common::common::message::{ - message_batch::MessageExtBatch, message_single::MessageExtBrokerInner, +use rocketmq_common::{ + common::message::{message_batch::MessageExtBatch, message_single::MessageExtBrokerInner}, + UtilAll::ensure_dir_ok, }; use tracing::{debug, error, info, warn}; @@ -89,6 +90,7 @@ impl DefaultMappedFile { pub fn new(file_name: String, file_size: u64) -> Self { let file_from_offset = Self::get_file_from_offset(&file_name); let path_buf = PathBuf::from(file_name.clone()); + ensure_dir_ok(path_buf.parent().unwrap().to_str().unwrap()); let file = OpenOptions::new() .read(true) .write(true) @@ -236,13 +238,16 @@ impl MappedFile for DefaultMappedFile { ..Default::default() }; } - message_callback.do_append( + let result = message_callback.do_append( self.file_from_offset as i64, self, (self.file_size - current_pos) as i32, &mut message, put_message_context, - ) + ); + self.store_timestamp + .store(message.store_timestamp(), Ordering::Release); + result } fn append_messages( @@ -463,10 +468,10 @@ impl MappedFile for DefaultMappedFile { fn get_read_position(&self) -> i32 { match self.transient_store_pool { - None => self.wrote_position.load(Ordering::Relaxed), + None => self.wrote_position.load(Ordering::Acquire), Some(_) => { //need to optimize - self.wrote_position.load(Ordering::Relaxed) + self.wrote_position.load(Ordering::Acquire) } } } diff --git a/rocketmq-store/src/message_store/default_message_store.rs b/rocketmq-store/src/message_store/default_message_store.rs index ce9d213f..58daf7bb 100644 --- a/rocketmq-store/src/message_store/default_message_store.rs +++ b/rocketmq-store/src/message_store/default_message_store.rs @@ -385,7 +385,7 @@ impl MessageStore for DefaultMessageStore { } fn shutdown(&mut self) { - if !self.shutdown.load(Ordering::Relaxed) { + if !self.shutdown.load(Ordering::Acquire) { self.shutdown.store(true, Ordering::SeqCst); self.reput_message_service.shutdown(); self.commit_log.shutdown(); @@ -418,12 +418,6 @@ impl MessageStore for DefaultMessageStore { } async fn put_message(&mut self, msg: MessageExtBrokerInner) -> PutMessageResult { - // let guard = self.put_message_hook_list.lock().await; - // for hook in guard.iter() { - // if let Some(result) = hook.execute_before_put_message(&msg.message_ext_inner) { - // return result; - // } - // } for hook in self.put_message_hook_list.iter() { if let Some(result) = hook.execute_before_put_message(&msg.message_ext_inner) { return result; @@ -473,12 +467,12 @@ impl CommitLogDispatcher for CommitLogDispatcherDefault { #[derive(Clone)] struct ReputMessageService { tx: Option>>, - reput_from_offset: Option, + reput_from_offset: Option>, } impl ReputMessageService { pub fn set_reput_from_offset(&mut self, reput_from_offset: i64) { - self.reput_from_offset = Some(reput_from_offset); + self.reput_from_offset = Some(Arc::new(AtomicI64::new(reput_from_offset))); } pub fn start( @@ -488,7 +482,7 @@ impl ReputMessageService { dispatcher: CommitLogDispatcherDefault, ) { let mut inner = ReputMessageServiceInner { - reput_from_offset: Arc::new(AtomicI64::new(self.reput_from_offset.unwrap())), + reput_from_offset: self.reput_from_offset.clone().unwrap(), commit_log, message_store_config, dispatcher, @@ -497,18 +491,32 @@ impl ReputMessageService { self.tx = Some(Arc::new(tx)); let handle = tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(1)); + let mut break_flag; loop { + inner.do_reput().await; + interval.tick().await; tokio::select! { - _ = inner.do_reput() =>{ - - } - _ = interval.tick() => { - - } _ = rx.recv() => { - break; + let mut index = 0; + while index < 50 && inner.is_commit_log_available() { + tokio::time::sleep(Duration::from_millis(500)).await; + if inner.is_commit_log_available() { + warn!( + "shutdown ReputMessageService, but CommitLog have not finish to be \ + dispatched, CommitLog max offset={}, reputFromOffset={}", + inner.commit_log.get_max_offset(), + inner.reput_from_offset.load(Ordering::Relaxed) + ); + } + index += 1; + } + info!("ReputMessageService shutdown now......"); + break_flag = true; } } + if break_flag { + break; + } } }); } @@ -541,7 +549,7 @@ struct ReputMessageServiceInner { impl ReputMessageServiceInner { pub async fn do_reput(&mut self) { - let reput_from_offset = self.reput_from_offset.load(Ordering::Relaxed); + let reput_from_offset = self.reput_from_offset.load(Ordering::Acquire); if reput_from_offset < self.commit_log.get_min_offset() { warn!( "The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate \ @@ -550,7 +558,7 @@ impl ReputMessageServiceInner { self.commit_log.get_min_offset() ); self.reput_from_offset - .store(self.commit_log.get_min_offset(), Ordering::SeqCst); + .store(self.commit_log.get_min_offset(), Ordering::Release); } let mut do_next = true; while do_next && self.is_commit_log_available() { @@ -561,12 +569,13 @@ impl ReputMessageServiceInner { } let result = result.unwrap(); self.reput_from_offset - .store(result.start_offset as i64, Ordering::Release); + .store(result.start_offset as i64, Ordering::SeqCst); let mut read_size = 0i32; let mapped_file = result.mapped_file.as_ref().unwrap(); let start_pos = (result.start_offset % mapped_file.get_file_size()) as i32; loop { let size = mapped_file.get_bytes((start_pos + read_size) as usize, 4); + println!("read_size={}", read_size); if size.is_none() { do_next = false; break; @@ -587,13 +596,16 @@ impl ReputMessageServiceInner { false, &self.message_store_config, ); - if self.reput_from_offset.load(Ordering::Relaxed) + dispatch_request.msg_size as i64 + if self.reput_from_offset.load(Ordering::Acquire) + dispatch_request.msg_size as i64 > self.commit_log.get_confirm_offset() { do_next = false; break; } - + println!( + "=============================read_size={}", + dispatch_request.msg_size + ); if dispatch_request.success { match dispatch_request.msg_size.cmp(&0) { std::cmp::Ordering::Greater => { @@ -613,6 +625,7 @@ impl ReputMessageServiceInner { .roll_next_file(self.reput_from_offset.load(Ordering::Relaxed)), Ordering::SeqCst, ); + read_size = result.size; } std::cmp::Ordering::Less => {} } @@ -631,7 +644,7 @@ impl ReputMessageServiceInner { } if !(read_size < result.size - && self.reput_from_offset.load(Ordering::Relaxed) + && self.reput_from_offset.load(Ordering::Acquire) < self.commit_log.get_confirm_offset() && do_next) { diff --git a/rocketmq-store/src/queue/local_file_consume_queue_store.rs b/rocketmq-store/src/queue/local_file_consume_queue_store.rs index 3f93261c..20444862 100644 --- a/rocketmq-store/src/queue/local_file_consume_queue_store.rs +++ b/rocketmq-store/src/queue/local_file_consume_queue_store.rs @@ -209,7 +209,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { fn put_message_position_info_wrapper(&self, request: &DispatchRequest) { let cq = self.find_or_create_consume_queue(request.topic.as_str(), request.queue_id); self.put_message_position_info_wrapper_with_cq(cq.lock().as_mut(), request); - println!("put_message_position_info_wrapper-----{}", request.topic) + // println!("put_message_position_info_wrapper-----{}", request.topic) } fn put_message_position_info_wrapper_with_cq( diff --git a/rocketmq-store/src/queue/single_consume_queue.rs b/rocketmq-store/src/queue/single_consume_queue.rs index 66ba8abf..1b4c86bc 100644 --- a/rocketmq-store/src/queue/single_consume_queue.rs +++ b/rocketmq-store/src/queue/single_consume_queue.rs @@ -124,7 +124,7 @@ impl ConsumeQueue { impl ConsumeQueue { pub fn set_max_physic_offset(&self, max_physic_offset: i64) { self.max_physic_offset - .store(max_physic_offset, std::sync::atomic::Ordering::SeqCst); + .store(max_physic_offset, std::sync::atomic::Ordering::Release); } pub fn truncate_dirty_logic_files_handler(&mut self, phy_offset: i64, delete_file: bool) { @@ -226,9 +226,11 @@ impl ConsumeQueue { ) -> bool { if offset + size as i64 <= self.get_max_physic_offset() { warn!( - "Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", + "Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}, \ + size={}", self.get_max_physic_offset(), - offset + offset, + size ); return true; } @@ -527,7 +529,7 @@ impl ConsumeQueueTrait for ConsumeQueue { } fn get_max_physic_offset(&self) -> i64 { - self.max_physic_offset.load(Ordering::Relaxed) + self.max_physic_offset.load(Ordering::Acquire) } fn get_min_logic_offset(&self) -> i64 {