Skip to content

[ISSUE #768]♻️Replace SyncUnsafeCell with ArcCellWrapper🔥 #771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::cell::SyncUnsafeCell;
use std::any::Any;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
Expand All @@ -28,6 +28,7 @@ use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_common::common::constant::PermName;
use rocketmq_common::common::server::config::ServerConfig;
use rocketmq_common::common::statistics::state_getter::StateGetter;
use rocketmq_common::ArcCellWrapper;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_common::UtilAll::compute_next_morning_time_millis;
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
Expand Down Expand Up @@ -65,6 +66,7 @@ use crate::processor::client_manage_processor::ClientManageProcessor;
use crate::processor::consumer_manage_processor::ConsumerManageProcessor;
use crate::processor::default_pull_message_result_handler::DefaultPullMessageResultHandler;
use crate::processor::pull_message_processor::PullMessageProcessor;
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
use crate::processor::send_message_processor::SendMessageProcessor;
use crate::processor::BrokerRequestProcessor;
use crate::schedule::schedule_message_service::ScheduleMessageService;
Expand Down Expand Up @@ -374,16 +376,16 @@ impl BrokerRuntime {
self.broker_config.clone(),
self.message_store.as_ref().unwrap(),
);
let pull_message_result_handler =
Arc::new(SyncUnsafeCell::new(DefaultPullMessageResultHandler::new(
let mut pull_message_result_handler =
ArcCellWrapper::new(Box::new(DefaultPullMessageResultHandler::new(
Arc::new(self.topic_config_manager.clone()),
Arc::new(self.consumer_offset_manager.clone()),
self.consumer_manager.clone(),
self.broadcast_offset_manager.clone(),
self.broker_stats_manager.clone(),
self.broker_config.clone(),
Arc::new(Default::default()),
)));
)) as Box<dyn PullMessageResultHandler>);
let message_store = Arc::new(self.message_store.as_ref().unwrap().clone());
let pull_message_processor = PullMessageProcessor::new(
pull_message_result_handler.clone(),
Expand Down Expand Up @@ -414,11 +416,13 @@ impl BrokerRuntime {
self.broker_config.clone(),
));

unsafe {
(*pull_message_result_handler.get()).set_pull_request_hold_service(Some(Arc::new(
let pull_message_result_handler = pull_message_result_handler.as_mut() as &mut dyn Any;
pull_message_result_handler
.downcast_mut::<DefaultPullMessageResultHandler>()
.unwrap()
.set_pull_request_hold_service(Some(Arc::new(
self.pull_request_hold_service.clone().unwrap(),
)));
}

self.message_store
.as_mut()
Expand Down
20 changes: 11 additions & 9 deletions rocketmq-broker/src/offset/manager/consumer_offset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* limitations under the License.
*/

use std::cell::SyncUnsafeCell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
Expand All @@ -27,6 +26,7 @@ use std::sync::Arc;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::ArcCellWrapper;
use rocketmq_remoting::protocol::DataVersion;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_store::log_file::MessageStore;
Expand Down Expand Up @@ -60,7 +60,7 @@ impl ConsumerOffsetManager {
ConsumerOffsetManager {
broker_config,
consumer_offset_wrapper: ConsumerOffsetWrapper {
data_version: Arc::new(SyncUnsafeCell::new(DataVersion::default())),
data_version: ArcCellWrapper::new(DataVersion::default()),
offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
reset_offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
pull_offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
Expand Down Expand Up @@ -143,7 +143,9 @@ impl ConsumerOffsetManager {
} else {
0
};
unsafe { &mut *self.consumer_offset_wrapper.data_version.get() }
self.consumer_offset_wrapper
.data_version
.mut_from_ref()
.next_version_with(state_machine_version);
}
}
Expand Down Expand Up @@ -212,8 +214,8 @@ impl ConfigManager for ConsumerOffsetManager {
.offset_table
.write()
.extend(wrapper.offset_table.read().clone());
let data_version = unsafe { &mut *self.consumer_offset_wrapper.data_version.get() };
*data_version = unsafe { &*wrapper.data_version.get() }.clone();
let data_version = self.consumer_offset_wrapper.data_version.mut_from_ref();
*data_version = wrapper.data_version.as_ref().clone();
}
}
}
Expand Down Expand Up @@ -253,9 +255,9 @@ impl ConsumerOffsetManager {
}
}

#[derive(Debug, Default, Clone)]
#[derive(Default, Clone)]
struct ConsumerOffsetWrapper {
data_version: Arc<SyncUnsafeCell<DataVersion>>,
data_version: ArcCellWrapper<DataVersion>,
offset_table: Arc<parking_lot::RwLock<HashMap<String /* topic@group */, HashMap<i32, i64>>>>,
reset_offset_table: Arc<parking_lot::RwLock<HashMap<String, HashMap<i32, i64>>>>,
pull_offset_table:
Expand Down Expand Up @@ -287,7 +289,7 @@ impl ConsumerOffsetWrapper {
impl Serialize for ConsumerOffsetWrapper {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut state = serializer.serialize_struct("ConsumerOffsetWrapper", 5)?;
state.serialize_field("dataVersion", unsafe { &*self.data_version.get() })?;
state.serialize_field("dataVersion", self.data_version.as_ref())?;
state.serialize_field("offsetTable", &*self.offset_table.read())?;
state.serialize_field("resetOffsetTable", &*self.reset_offset_table.read())?;
state.serialize_field("pullOffsetTable", &*self.pull_offset_table.read())?;
Expand Down Expand Up @@ -390,7 +392,7 @@ impl<'de> Deserialize<'de> for ConsumerOffsetWrapper {
let pull_offset_table = pull_offset_table.unwrap_or_default();

Ok(ConsumerOffsetWrapper {
data_version: Arc::new(SyncUnsafeCell::new(data_version)),
data_version: ArcCellWrapper::new(data_version),
offset_table: Arc::new(parking_lot::RwLock::new(offset_table)),
reset_offset_table: Arc::new(parking_lot::RwLock::new(reset_offset_table)),
pull_offset_table: Arc::new(parking_lot::RwLock::new(pull_offset_table)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ impl TopicRequestHandler {
.broker_runtime_inner()
.register_increment_broker_data(
vec![topic_config],
self.inner.topic_config_manager.data_version().clone(),
self.inner
.topic_config_manager
.data_version()
.as_ref()
.clone(),
)
.await;
}
Expand Down Expand Up @@ -261,7 +265,11 @@ impl TopicRequestHandler {
.broker_runtime_inner()
.register_increment_broker_data(
request_body.topic_config_list,
self.inner.topic_config_manager.data_version().clone(),
self.inner
.topic_config_manager
.data_version()
.as_ref()
.clone(),
)
.await;
}
Expand Down Expand Up @@ -364,7 +372,13 @@ impl TopicRequestHandler {
.lock()
.clone(),
),
data_version: Some(self.inner.topic_config_manager.data_version().clone()),
data_version: Some(
self.inner
.topic_config_manager
.data_version()
.as_ref()
.clone(),
),
..Default::default()
};
let content = topic_config_and_mapping_serialize_wrapper.to_json();
Expand Down
17 changes: 6 additions & 11 deletions rocketmq-broker/src/processor/pull_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::cell::SyncUnsafeCell;
use std::sync::Arc;

use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::constant::PermName;
use rocketmq_common::common::filter::expression_type::ExpressionType;
use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
use rocketmq_common::common::FAQUrl;
use rocketmq_common::ArcCellWrapper;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
Expand Down Expand Up @@ -68,7 +68,7 @@ use crate::topic::manager::topic_queue_mapping_manager::TopicQueueMappingManager

#[derive(Clone)]
pub struct PullMessageProcessor<MS> {
pull_message_result_handler: Arc<SyncUnsafeCell<dyn PullMessageResultHandler>>,
pull_message_result_handler: ArcCellWrapper<Box<dyn PullMessageResultHandler>>,
broker_config: Arc<BrokerConfig>,
subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,
topic_config_manager: Arc<TopicConfigManager>,
Expand All @@ -84,7 +84,7 @@ pub struct PullMessageProcessor<MS> {

impl<MS> PullMessageProcessor<MS> {
pub fn new(
pull_message_result_handler: Arc<SyncUnsafeCell<dyn PullMessageResultHandler>>,
pull_message_result_handler: ArcCellWrapper<Box<dyn PullMessageResultHandler>>,
broker_config: Arc<BrokerConfig>,
subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,
topic_config_manager: Arc<TopicConfigManager>,
Expand Down Expand Up @@ -758,7 +758,7 @@ where
}
};
if let Some(get_message_result) = get_message_result {
return self.pull_message_result_handler().handle(
return self.pull_message_result_handler.handle(
get_message_result,
request,
request_header,
Expand All @@ -776,10 +776,6 @@ where
None
}

fn pull_message_result_handler(&self) -> &dyn PullMessageResultHandler {
unsafe { &*self.pull_message_result_handler.get() }
}

fn query_broadcast_pull_init_offset(
&mut self,
topic: &str,
Expand Down Expand Up @@ -848,9 +844,8 @@ where
let command = response.set_opaque(opaque).mark_response_type();
match ctx.upgrade() {
None => {}
Some(ctx) => {
let ctx_ref = unsafe { &mut *ctx.get() };
ctx_ref.write(command).await;
Some(mut ctx) => {
ctx.write(command).await;
}
}
}
Expand Down
31 changes: 14 additions & 17 deletions rocketmq-broker/src/topic/manager/topic_config_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* limitations under the License.
*/

use std::cell::SyncUnsafeCell;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
Expand All @@ -29,6 +28,7 @@ use rocketmq_common::common::constant::PermName;
use rocketmq_common::common::mix_all;
use rocketmq_common::common::topic::TopicValidator;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::ArcCellWrapper;
use rocketmq_common::TopicAttributes::ALL;
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
use rocketmq_remoting::protocol::body::topic_info_wrapper::TopicConfigSerializeWrapper;
Expand All @@ -45,7 +45,7 @@ use crate::broker_runtime::BrokerRuntimeInner;

pub(crate) struct TopicConfigManager {
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
data_version: Arc<SyncUnsafeCell<DataVersion>>,
data_version: ArcCellWrapper<DataVersion>,
broker_config: Arc<BrokerConfig>,
message_store: Option<DefaultMessageStore>,
topic_config_table_lock: Arc<parking_lot::ReentrantMutex<()>>,
Expand Down Expand Up @@ -74,7 +74,7 @@ impl TopicConfigManager {
) -> Self {
let mut manager = Self {
topic_config_table: Arc::new(parking_lot::Mutex::new(HashMap::new())),
data_version: Arc::new(SyncUnsafeCell::new(DataVersion::default())),
data_version: ArcCellWrapper::new(DataVersion::default()),
broker_config,
message_store: None,
topic_config_table_lock: Default::default(),
Expand Down Expand Up @@ -231,7 +231,7 @@ impl TopicConfigManager {
topic_queue_mapping_info_map: HashMap<String, TopicQueueMappingInfo>,
) -> TopicConfigAndMappingSerializeWrapper {
if self.broker_config.enable_split_registration {
self.data_version_mut().next_version();
self.data_version.mut_from_ref().next_version();
}
TopicConfigAndMappingSerializeWrapper {
topic_config_table: Some(topic_config_table),
Expand Down Expand Up @@ -296,7 +296,7 @@ impl TopicConfigManager {
default_topic, topic_config, remote_address
);
self.put_topic_config(topic_config.clone());
self.data_version_mut().next_version_with(
self.data_version.mut_from_ref().next_version_with(
self.message_store
.as_ref()
.unwrap()
Expand Down Expand Up @@ -357,7 +357,7 @@ impl TopicConfigManager {
config.order = is_order;

self.put_topic_config(config.clone());
self.data_version_mut().next_version_with(
self.data_version.mut_from_ref().next_version_with(
self.message_store
.as_ref()
.unwrap()
Expand Down Expand Up @@ -410,7 +410,8 @@ impl TopicConfigManager {
} else {
0
};
self.data_version_mut()
self.data_version
.mut_from_ref()
.next_version_with(state_machine_version);
self.persist();
} else {
Expand Down Expand Up @@ -441,7 +442,7 @@ impl TopicConfigManager {
}
}

self.data_version_mut().next_version_with(
self.data_version.mut_from_ref().next_version_with(
self.message_store
.as_ref()
.unwrap()
Expand Down Expand Up @@ -508,7 +509,7 @@ impl TopicConfigManager {
config.topic_sys_flag = 0;
info!("create new topic {:?}", config);
self.put_topic_config(config.clone());
self.data_version_mut().next_version_with(
self.data_version.mut_from_ref().next_version_with(
self.message_store
.as_ref()
.unwrap()
Expand All @@ -530,12 +531,8 @@ impl TopicConfigManager {
self.topic_config_table.lock().contains_key(topic)
}

pub fn data_version(&self) -> &DataVersion {
unsafe { &*self.data_version.get() }
}

fn data_version_mut(&self) -> &mut DataVersion {
unsafe { &mut *self.data_version.get() }
pub fn data_version(&self) -> ArcCellWrapper<DataVersion> {
self.data_version.clone()
}

#[inline]
Expand All @@ -551,7 +548,7 @@ impl ConfigManager for TopicConfigManager {

fn encode_pretty(&self, pretty_format: bool) -> String {
let topic_config_table = self.topic_config_table.lock().clone();
let version = self.data_version().clone();
let version = self.data_version().as_ref().clone();
match pretty_format {
true => TopicConfigSerializeWrapper::new(Some(topic_config_table), Some(version))
.to_json_pretty(),
Expand All @@ -569,7 +566,7 @@ impl ConfigManager for TopicConfigManager {
let wrapper = SerdeJsonUtils::from_json_str::<TopicConfigSerializeWrapper>(json_string)
.expect("Decode TopicConfigSerializeWrapper from json failed");
if let Some(value) = wrapper.data_version() {
self.data_version_mut().assign_new_one(value);
self.data_version.mut_from_ref().assign_new_one(value);
}
if let Some(map) = wrapper.topic_config_table() {
for (key, value) in map {
Expand Down
Loading
Loading