Skip to content

Commit 0c99d64

Browse files
authored
[ISSUE #768]♻️Replace SyncUnsafeCell with ArcCellWrapper🔥 (#771)
1 parent d4e414d commit 0c99d64

File tree

13 files changed

+200
-180
lines changed

13 files changed

+200
-180
lines changed

rocketmq-broker/src/broker_runtime.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use std::cell::SyncUnsafeCell;
17+
use std::any::Any;
1818
use std::collections::HashMap;
1919
use std::sync::atomic::AtomicBool;
2020
use std::sync::atomic::AtomicU64;
@@ -28,6 +28,7 @@ use rocketmq_common::common::config_manager::ConfigManager;
2828
use rocketmq_common::common::constant::PermName;
2929
use rocketmq_common::common::server::config::ServerConfig;
3030
use rocketmq_common::common::statistics::state_getter::StateGetter;
31+
use rocketmq_common::ArcCellWrapper;
3132
use rocketmq_common::TimeUtils::get_current_millis;
3233
use rocketmq_common::UtilAll::compute_next_morning_time_millis;
3334
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
@@ -65,6 +66,7 @@ use crate::processor::client_manage_processor::ClientManageProcessor;
6566
use crate::processor::consumer_manage_processor::ConsumerManageProcessor;
6667
use crate::processor::default_pull_message_result_handler::DefaultPullMessageResultHandler;
6768
use crate::processor::pull_message_processor::PullMessageProcessor;
69+
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
6870
use crate::processor::send_message_processor::SendMessageProcessor;
6971
use crate::processor::BrokerRequestProcessor;
7072
use crate::schedule::schedule_message_service::ScheduleMessageService;
@@ -374,16 +376,16 @@ impl BrokerRuntime {
374376
self.broker_config.clone(),
375377
self.message_store.as_ref().unwrap(),
376378
);
377-
let pull_message_result_handler =
378-
Arc::new(SyncUnsafeCell::new(DefaultPullMessageResultHandler::new(
379+
let mut pull_message_result_handler =
380+
ArcCellWrapper::new(Box::new(DefaultPullMessageResultHandler::new(
379381
Arc::new(self.topic_config_manager.clone()),
380382
Arc::new(self.consumer_offset_manager.clone()),
381383
self.consumer_manager.clone(),
382384
self.broadcast_offset_manager.clone(),
383385
self.broker_stats_manager.clone(),
384386
self.broker_config.clone(),
385387
Arc::new(Default::default()),
386-
)));
388+
)) as Box<dyn PullMessageResultHandler>);
387389
let message_store = Arc::new(self.message_store.as_ref().unwrap().clone());
388390
let pull_message_processor = PullMessageProcessor::new(
389391
pull_message_result_handler.clone(),
@@ -414,11 +416,13 @@ impl BrokerRuntime {
414416
self.broker_config.clone(),
415417
));
416418

417-
unsafe {
418-
(*pull_message_result_handler.get()).set_pull_request_hold_service(Some(Arc::new(
419+
let pull_message_result_handler = pull_message_result_handler.as_mut() as &mut dyn Any;
420+
pull_message_result_handler
421+
.downcast_mut::<DefaultPullMessageResultHandler>()
422+
.unwrap()
423+
.set_pull_request_hold_service(Some(Arc::new(
419424
self.pull_request_hold_service.clone().unwrap(),
420425
)));
421-
}
422426

423427
self.message_store
424428
.as_mut()

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
* limitations under the License.
1616
*/
1717

18-
use std::cell::SyncUnsafeCell;
1918
use std::collections::HashMap;
2019
use std::collections::HashSet;
2120
use std::fmt;
@@ -27,6 +26,7 @@ use std::sync::Arc;
2726
use rocketmq_common::common::broker::broker_config::BrokerConfig;
2827
use rocketmq_common::common::config_manager::ConfigManager;
2928
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
29+
use rocketmq_common::ArcCellWrapper;
3030
use rocketmq_remoting::protocol::DataVersion;
3131
use rocketmq_remoting::protocol::RemotingSerializable;
3232
use rocketmq_store::log_file::MessageStore;
@@ -60,7 +60,7 @@ impl ConsumerOffsetManager {
6060
ConsumerOffsetManager {
6161
broker_config,
6262
consumer_offset_wrapper: ConsumerOffsetWrapper {
63-
data_version: Arc::new(SyncUnsafeCell::new(DataVersion::default())),
63+
data_version: ArcCellWrapper::new(DataVersion::default()),
6464
offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
6565
reset_offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
6666
pull_offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
@@ -143,7 +143,9 @@ impl ConsumerOffsetManager {
143143
} else {
144144
0
145145
};
146-
unsafe { &mut *self.consumer_offset_wrapper.data_version.get() }
146+
self.consumer_offset_wrapper
147+
.data_version
148+
.mut_from_ref()
147149
.next_version_with(state_machine_version);
148150
}
149151
}
@@ -212,8 +214,8 @@ impl ConfigManager for ConsumerOffsetManager {
212214
.offset_table
213215
.write()
214216
.extend(wrapper.offset_table.read().clone());
215-
let data_version = unsafe { &mut *self.consumer_offset_wrapper.data_version.get() };
216-
*data_version = unsafe { &*wrapper.data_version.get() }.clone();
217+
let data_version = self.consumer_offset_wrapper.data_version.mut_from_ref();
218+
*data_version = wrapper.data_version.as_ref().clone();
217219
}
218220
}
219221
}
@@ -253,9 +255,9 @@ impl ConsumerOffsetManager {
253255
}
254256
}
255257

256-
#[derive(Debug, Default, Clone)]
258+
#[derive(Default, Clone)]
257259
struct ConsumerOffsetWrapper {
258-
data_version: Arc<SyncUnsafeCell<DataVersion>>,
260+
data_version: ArcCellWrapper<DataVersion>,
259261
offset_table: Arc<parking_lot::RwLock<HashMap<String /* topic@group */, HashMap<i32, i64>>>>,
260262
reset_offset_table: Arc<parking_lot::RwLock<HashMap<String, HashMap<i32, i64>>>>,
261263
pull_offset_table:
@@ -287,7 +289,7 @@ impl ConsumerOffsetWrapper {
287289
impl Serialize for ConsumerOffsetWrapper {
288290
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
289291
let mut state = serializer.serialize_struct("ConsumerOffsetWrapper", 5)?;
290-
state.serialize_field("dataVersion", unsafe { &*self.data_version.get() })?;
292+
state.serialize_field("dataVersion", self.data_version.as_ref())?;
291293
state.serialize_field("offsetTable", &*self.offset_table.read())?;
292294
state.serialize_field("resetOffsetTable", &*self.reset_offset_table.read())?;
293295
state.serialize_field("pullOffsetTable", &*self.pull_offset_table.read())?;
@@ -390,7 +392,7 @@ impl<'de> Deserialize<'de> for ConsumerOffsetWrapper {
390392
let pull_offset_table = pull_offset_table.unwrap_or_default();
391393

392394
Ok(ConsumerOffsetWrapper {
393-
data_version: Arc::new(SyncUnsafeCell::new(data_version)),
395+
data_version: ArcCellWrapper::new(data_version),
394396
offset_table: Arc::new(parking_lot::RwLock::new(offset_table)),
395397
reset_offset_table: Arc::new(parking_lot::RwLock::new(reset_offset_table)),
396398
pull_offset_table: Arc::new(parking_lot::RwLock::new(pull_offset_table)),

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,11 @@ impl TopicRequestHandler {
163163
.broker_runtime_inner()
164164
.register_increment_broker_data(
165165
vec![topic_config],
166-
self.inner.topic_config_manager.data_version().clone(),
166+
self.inner
167+
.topic_config_manager
168+
.data_version()
169+
.as_ref()
170+
.clone(),
167171
)
168172
.await;
169173
}
@@ -261,7 +265,11 @@ impl TopicRequestHandler {
261265
.broker_runtime_inner()
262266
.register_increment_broker_data(
263267
request_body.topic_config_list,
264-
self.inner.topic_config_manager.data_version().clone(),
268+
self.inner
269+
.topic_config_manager
270+
.data_version()
271+
.as_ref()
272+
.clone(),
265273
)
266274
.await;
267275
}
@@ -364,7 +372,13 @@ impl TopicRequestHandler {
364372
.lock()
365373
.clone(),
366374
),
367-
data_version: Some(self.inner.topic_config_manager.data_version().clone()),
375+
data_version: Some(
376+
self.inner
377+
.topic_config_manager
378+
.data_version()
379+
.as_ref()
380+
.clone(),
381+
),
368382
..Default::default()
369383
};
370384
let content = topic_config_and_mapping_serialize_wrapper.to_json();

rocketmq-broker/src/processor/pull_message_processor.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use std::cell::SyncUnsafeCell;
1817
use std::sync::Arc;
1918

2019
use rocketmq_common::common::broker::broker_config::BrokerConfig;
2120
use rocketmq_common::common::constant::PermName;
2221
use rocketmq_common::common::filter::expression_type::ExpressionType;
2322
use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
2423
use rocketmq_common::common::FAQUrl;
24+
use rocketmq_common::ArcCellWrapper;
2525
use rocketmq_common::TimeUtils::get_current_millis;
2626
use rocketmq_remoting::code::request_code::RequestCode;
2727
use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
@@ -68,7 +68,7 @@ use crate::topic::manager::topic_queue_mapping_manager::TopicQueueMappingManager
6868

6969
#[derive(Clone)]
7070
pub struct PullMessageProcessor<MS> {
71-
pull_message_result_handler: Arc<SyncUnsafeCell<dyn PullMessageResultHandler>>,
71+
pull_message_result_handler: ArcCellWrapper<Box<dyn PullMessageResultHandler>>,
7272
broker_config: Arc<BrokerConfig>,
7373
subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,
7474
topic_config_manager: Arc<TopicConfigManager>,
@@ -84,7 +84,7 @@ pub struct PullMessageProcessor<MS> {
8484

8585
impl<MS> PullMessageProcessor<MS> {
8686
pub fn new(
87-
pull_message_result_handler: Arc<SyncUnsafeCell<dyn PullMessageResultHandler>>,
87+
pull_message_result_handler: ArcCellWrapper<Box<dyn PullMessageResultHandler>>,
8888
broker_config: Arc<BrokerConfig>,
8989
subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,
9090
topic_config_manager: Arc<TopicConfigManager>,
@@ -758,7 +758,7 @@ where
758758
}
759759
};
760760
if let Some(get_message_result) = get_message_result {
761-
return self.pull_message_result_handler().handle(
761+
return self.pull_message_result_handler.handle(
762762
get_message_result,
763763
request,
764764
request_header,
@@ -776,10 +776,6 @@ where
776776
None
777777
}
778778

779-
fn pull_message_result_handler(&self) -> &dyn PullMessageResultHandler {
780-
unsafe { &*self.pull_message_result_handler.get() }
781-
}
782-
783779
fn query_broadcast_pull_init_offset(
784780
&mut self,
785781
topic: &str,
@@ -848,9 +844,8 @@ where
848844
let command = response.set_opaque(opaque).mark_response_type();
849845
match ctx.upgrade() {
850846
None => {}
851-
Some(ctx) => {
852-
let ctx_ref = unsafe { &mut *ctx.get() };
853-
ctx_ref.write(command).await;
847+
Some(mut ctx) => {
848+
ctx.write(command).await;
854849
}
855850
}
856851
}

rocketmq-broker/src/topic/manager/topic_config_manager.rs

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
* limitations under the License.
1616
*/
1717

18-
use std::cell::SyncUnsafeCell;
1918
use std::collections::HashMap;
2019
use std::net::SocketAddr;
2120
use std::sync::Arc;
@@ -29,6 +28,7 @@ use rocketmq_common::common::constant::PermName;
2928
use rocketmq_common::common::mix_all;
3029
use rocketmq_common::common::topic::TopicValidator;
3130
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
31+
use rocketmq_common::ArcCellWrapper;
3232
use rocketmq_common::TopicAttributes::ALL;
3333
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
3434
use rocketmq_remoting::protocol::body::topic_info_wrapper::TopicConfigSerializeWrapper;
@@ -45,7 +45,7 @@ use crate::broker_runtime::BrokerRuntimeInner;
4545

4646
pub(crate) struct TopicConfigManager {
4747
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
48-
data_version: Arc<SyncUnsafeCell<DataVersion>>,
48+
data_version: ArcCellWrapper<DataVersion>,
4949
broker_config: Arc<BrokerConfig>,
5050
message_store: Option<DefaultMessageStore>,
5151
topic_config_table_lock: Arc<parking_lot::ReentrantMutex<()>>,
@@ -74,7 +74,7 @@ impl TopicConfigManager {
7474
) -> Self {
7575
let mut manager = Self {
7676
topic_config_table: Arc::new(parking_lot::Mutex::new(HashMap::new())),
77-
data_version: Arc::new(SyncUnsafeCell::new(DataVersion::default())),
77+
data_version: ArcCellWrapper::new(DataVersion::default()),
7878
broker_config,
7979
message_store: None,
8080
topic_config_table_lock: Default::default(),
@@ -231,7 +231,7 @@ impl TopicConfigManager {
231231
topic_queue_mapping_info_map: HashMap<String, TopicQueueMappingInfo>,
232232
) -> TopicConfigAndMappingSerializeWrapper {
233233
if self.broker_config.enable_split_registration {
234-
self.data_version_mut().next_version();
234+
self.data_version.mut_from_ref().next_version();
235235
}
236236
TopicConfigAndMappingSerializeWrapper {
237237
topic_config_table: Some(topic_config_table),
@@ -296,7 +296,7 @@ impl TopicConfigManager {
296296
default_topic, topic_config, remote_address
297297
);
298298
self.put_topic_config(topic_config.clone());
299-
self.data_version_mut().next_version_with(
299+
self.data_version.mut_from_ref().next_version_with(
300300
self.message_store
301301
.as_ref()
302302
.unwrap()
@@ -357,7 +357,7 @@ impl TopicConfigManager {
357357
config.order = is_order;
358358

359359
self.put_topic_config(config.clone());
360-
self.data_version_mut().next_version_with(
360+
self.data_version.mut_from_ref().next_version_with(
361361
self.message_store
362362
.as_ref()
363363
.unwrap()
@@ -410,7 +410,8 @@ impl TopicConfigManager {
410410
} else {
411411
0
412412
};
413-
self.data_version_mut()
413+
self.data_version
414+
.mut_from_ref()
414415
.next_version_with(state_machine_version);
415416
self.persist();
416417
} else {
@@ -441,7 +442,7 @@ impl TopicConfigManager {
441442
}
442443
}
443444

444-
self.data_version_mut().next_version_with(
445+
self.data_version.mut_from_ref().next_version_with(
445446
self.message_store
446447
.as_ref()
447448
.unwrap()
@@ -508,7 +509,7 @@ impl TopicConfigManager {
508509
config.topic_sys_flag = 0;
509510
info!("create new topic {:?}", config);
510511
self.put_topic_config(config.clone());
511-
self.data_version_mut().next_version_with(
512+
self.data_version.mut_from_ref().next_version_with(
512513
self.message_store
513514
.as_ref()
514515
.unwrap()
@@ -530,12 +531,8 @@ impl TopicConfigManager {
530531
self.topic_config_table.lock().contains_key(topic)
531532
}
532533

533-
pub fn data_version(&self) -> &DataVersion {
534-
unsafe { &*self.data_version.get() }
535-
}
536-
537-
fn data_version_mut(&self) -> &mut DataVersion {
538-
unsafe { &mut *self.data_version.get() }
534+
pub fn data_version(&self) -> ArcCellWrapper<DataVersion> {
535+
self.data_version.clone()
539536
}
540537

541538
#[inline]
@@ -551,7 +548,7 @@ impl ConfigManager for TopicConfigManager {
551548

552549
fn encode_pretty(&self, pretty_format: bool) -> String {
553550
let topic_config_table = self.topic_config_table.lock().clone();
554-
let version = self.data_version().clone();
551+
let version = self.data_version().as_ref().clone();
555552
match pretty_format {
556553
true => TopicConfigSerializeWrapper::new(Some(topic_config_table), Some(version))
557554
.to_json_pretty(),
@@ -569,7 +566,7 @@ impl ConfigManager for TopicConfigManager {
569566
let wrapper = SerdeJsonUtils::from_json_str::<TopicConfigSerializeWrapper>(json_string)
570567
.expect("Decode TopicConfigSerializeWrapper from json failed");
571568
if let Some(value) = wrapper.data_version() {
572-
self.data_version_mut().assign_new_one(value);
569+
self.data_version.mut_from_ref().assign_new_one(value);
573570
}
574571
if let Some(map) = wrapper.topic_config_table() {
575572
for (key, value) in map {

0 commit comments

Comments
 (0)