Skip to content

Commit a18b42d

Browse files
committed
[ISSUE #1107]Replace ArcRefCellWrapper with ArcMut
1 parent 2e8e9ed commit a18b42d

File tree

74 files changed

+456
-743
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+456
-743
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rocketmq-broker/src/broker_runtime.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use rocketmq_common::common::config_manager::ConfigManager;
2727
use rocketmq_common::common::constant::PermName;
2828
use rocketmq_common::common::server::config::ServerConfig;
2929
use rocketmq_common::common::statistics::state_getter::StateGetter;
30-
use rocketmq_common::ArcRefCellWrapper;
3130
use rocketmq_common::TimeUtils::get_current_millis;
3231
use rocketmq_common::UtilAll::compute_next_morning_time_millis;
3332
use rocketmq_remoting::protocol::body::broker_body::broker_member_group::BrokerMemberGroup;
@@ -39,6 +38,7 @@ use rocketmq_remoting::protocol::DataVersion;
3938
use rocketmq_remoting::remoting_server::server::RocketMQServer;
4039
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
4140
use rocketmq_runtime::RocketMQRuntime;
41+
use rocketmq_rust::ArcMut;
4242
use rocketmq_store::base::store_enum::StoreType;
4343
use rocketmq_store::config::message_store_config::MessageStoreConfig;
4444
use rocketmq_store::log_file::MessageStore;
@@ -96,7 +96,7 @@ pub(crate) struct BrokerRuntime {
9696
consumer_filter_manager: Arc<ConsumerFilterManager>,
9797
consumer_order_info_manager: Arc<ConsumerOrderInfoManager>,
9898
#[cfg(feature = "local_file_store")]
99-
message_store: Option<ArcRefCellWrapper<DefaultMessageStore>>,
99+
message_store: Option<ArcMut<DefaultMessageStore>>,
100100
#[cfg(feature = "local_file_store")]
101101
broker_stats: Option<Arc<BrokerStats<DefaultMessageStore>>>,
102102
//message_store: Option<Arc<Mutex<LocalFileMessageStore>>>,
@@ -118,13 +118,12 @@ pub(crate) struct BrokerRuntime {
118118
should_start_time: Arc<AtomicU64>,
119119
is_isolated: Arc<AtomicBool>,
120120
#[cfg(feature = "local_file_store")]
121-
pull_request_hold_service:
122-
Option<ArcRefCellWrapper<PullRequestHoldService<DefaultMessageStore>>>,
121+
pull_request_hold_service: Option<ArcMut<PullRequestHoldService<DefaultMessageStore>>>,
123122
rebalance_lock_manager: Arc<RebalanceLockManager>,
124123
broker_member_group: Arc<BrokerMemberGroup>,
125124
#[cfg(feature = "local_file_store")]
126125
transactional_message_service:
127-
Option<ArcRefCellWrapper<DefaultTransactionalMessageService<DefaultMessageStore>>>,
126+
Option<ArcMut<DefaultTransactionalMessageService<DefaultMessageStore>>>,
128127
transactional_message_check_listener: Option<Arc<DefaultTransactionalMessageCheckListener>>,
129128
transactional_message_check_service: Option<Arc<TransactionalMessageCheckService>>,
130129
transaction_metrics_flush_service: Option<Arc<TransactionMetricsFlushService>>,
@@ -335,7 +334,7 @@ impl BrokerRuntime {
335334
async fn initialize_message_store(&mut self) -> bool {
336335
if self.message_store_config.store_type == StoreType::LocalFile {
337336
info!("Use local file as message store");
338-
let mut message_store = ArcRefCellWrapper::new(DefaultMessageStore::new(
337+
let mut message_store = ArcMut::new(DefaultMessageStore::new(
339338
self.message_store_config.clone(),
340339
self.broker_config.clone(),
341340
self.topic_config_manager.topic_config_table(),
@@ -441,7 +440,7 @@ impl BrokerRuntime {
441440
self.transactional_message_service.as_ref().unwrap().clone(),
442441
);
443442
let mut pull_message_result_handler =
444-
ArcRefCellWrapper::new(Box::new(DefaultPullMessageResultHandler::new(
443+
ArcMut::new(Box::new(DefaultPullMessageResultHandler::new(
445444
self.message_store_config.clone(),
446445
Arc::new(self.topic_config_manager.clone()),
447446
Arc::new(self.consumer_offset_manager.clone()),
@@ -452,7 +451,7 @@ impl BrokerRuntime {
452451
Arc::new(Default::default()),
453452
)) as Box<dyn PullMessageResultHandler>);
454453
let message_store = self.message_store.clone().unwrap();
455-
let pull_message_processor = ArcRefCellWrapper::new(PullMessageProcessor::new(
454+
let pull_message_processor = ArcMut::new(PullMessageProcessor::new(
456455
pull_message_result_handler.clone(),
457456
self.broker_config.clone(),
458457
self.subscription_group_manager.clone(),
@@ -475,7 +474,7 @@ impl BrokerRuntime {
475474
Arc::new(self.topic_config_manager.clone()),
476475
self.message_store.clone().unwrap(),
477476
);
478-
self.pull_request_hold_service = Some(ArcRefCellWrapper::new(PullRequestHoldService::new(
477+
self.pull_request_hold_service = Some(ArcMut::new(PullRequestHoldService::new(
479478
message_store.clone(),
480479
pull_message_processor.clone(),
481480
self.broker_config.clone(),
@@ -515,26 +514,26 @@ impl BrokerRuntime {
515514
);
516515

517516
BrokerRequestProcessor {
518-
send_message_processor: ArcRefCellWrapper::new(send_message_processor),
517+
send_message_processor: ArcMut::new(send_message_processor),
519518
pull_message_processor,
520519
peek_message_processor: Default::default(),
521520
pop_message_processor: Default::default(),
522521
ack_message_processor: Default::default(),
523522
change_invisible_time_processor: Default::default(),
524523
notification_processor: Default::default(),
525524
polling_info_processor: Default::default(),
526-
reply_message_processor: ArcRefCellWrapper::new(reply_message_processor),
527-
admin_broker_processor: ArcRefCellWrapper::new(admin_broker_processor),
528-
client_manage_processor: ArcRefCellWrapper::new(ClientManageProcessor::new(
525+
reply_message_processor: ArcMut::new(reply_message_processor),
526+
admin_broker_processor: ArcMut::new(admin_broker_processor),
527+
client_manage_processor: ArcMut::new(ClientManageProcessor::new(
529528
self.broker_config.clone(),
530529
self.producer_manager.clone(),
531530
self.consumer_manager.clone(),
532531
self.topic_config_manager.clone(),
533532
self.subscription_group_manager.clone(),
534533
)),
535-
consumer_manage_processor: ArcRefCellWrapper::new(consumer_manage_processor),
534+
consumer_manage_processor: ArcMut::new(consumer_manage_processor),
536535
query_assignment_processor: Default::default(),
537-
query_message_processor: ArcRefCellWrapper::new(query_message_processor),
536+
query_message_processor: ArcMut::new(query_message_processor),
538537
end_transaction_processor: Default::default(),
539538
}
540539
}
@@ -679,7 +678,7 @@ impl BrokerRuntime {
679678
self.topic_config_manager.clone()
680679
);
681680
let service = DefaultTransactionalMessageService::new(bridge);
682-
self.transactional_message_service = Some(ArcRefCellWrapper::new(service));
681+
self.transactional_message_service = Some(ArcMut::new(service));
683682
}
684683
}
685684
self.transactional_message_check_listener =

rocketmq-broker/src/hook/check_before_put_message.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::ops::Deref;
1818
use std::sync::Arc;
1919

2020
use rocketmq_common::common::message::message_ext::MessageExt;
21-
use rocketmq_common::ArcRefCellWrapper;
21+
use rocketmq_rust::ArcMut;
2222
use rocketmq_store::base::message_result::PutMessageResult;
2323
use rocketmq_store::config::message_store_config::MessageStoreConfig;
2424
use rocketmq_store::hook::put_message_hook::PutMessageHook;
@@ -27,15 +27,12 @@ use rocketmq_store::log_file::MessageStore;
2727
use crate::util::hook_utils::HookUtils;
2828

2929
pub struct CheckBeforePutMessageHook<MS> {
30-
message_store: ArcRefCellWrapper<MS>,
30+
message_store: ArcMut<MS>,
3131
message_store_config: Arc<MessageStoreConfig>,
3232
}
3333

3434
impl<MS: MessageStore> CheckBeforePutMessageHook<MS> {
35-
pub fn new(
36-
message_store: ArcRefCellWrapper<MS>,
37-
message_store_config: Arc<MessageStoreConfig>,
38-
) -> Self {
35+
pub fn new(message_store: ArcMut<MS>, message_store_config: Arc<MessageStoreConfig>) -> Self {
3936
Self {
4037
message_store,
4138
message_store_config,

rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use std::collections::HashMap;
1919
use std::sync::Arc;
2020

2121
use rocketmq_common::common::broker::broker_config::BrokerConfig;
22-
use rocketmq_common::ArcRefCellWrapper;
2322
use rocketmq_common::TimeUtils::get_current_millis;
23+
use rocketmq_rust::ArcMut;
2424
use rocketmq_store::consume_queue::consume_queue_ext::CqExtUnit;
2525
use rocketmq_store::log_file::MessageStore;
2626
use tokio::sync::Notify;
@@ -36,8 +36,8 @@ const TOPIC_QUEUE_ID_SEPARATOR: &str = "@";
3636

3737
pub struct PullRequestHoldService<MS> {
3838
pull_request_table: Arc<parking_lot::RwLock<HashMap<String, ManyPullRequest>>>,
39-
pull_message_processor: ArcRefCellWrapper<PullMessageProcessor<MS>>,
40-
message_store: ArcRefCellWrapper<MS>,
39+
pull_message_processor: ArcMut<PullMessageProcessor<MS>>,
40+
message_store: ArcMut<MS>,
4141
broker_config: Arc<BrokerConfig>,
4242
shutdown: Arc<Notify>,
4343
}
@@ -47,8 +47,8 @@ where
4747
MS: MessageStore + Send + Sync,
4848
{
4949
pub fn new(
50-
message_store: ArcRefCellWrapper<MS>,
51-
pull_message_processor: ArcRefCellWrapper<PullMessageProcessor<MS>>,
50+
message_store: ArcMut<MS>,
51+
pull_message_processor: ArcMut<PullMessageProcessor<MS>>,
5252
broker_config: Arc<BrokerConfig>,
5353
) -> Self {
5454
PullRequestHoldService {
@@ -66,7 +66,7 @@ impl<MS> PullRequestHoldService<MS>
6666
where
6767
MS: MessageStore + Send + Sync,
6868
{
69-
pub fn start(&mut self, this: ArcRefCellWrapper<Self>) {
69+
pub fn start(&mut self, this: ArcMut<Self>) {
7070
tokio::spawn(async move {
7171
loop {
7272
let handle_future = if this.broker_config.long_polling_enable {

rocketmq-broker/src/long_polling/notify_message_arriving_listener.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@
1616
*/
1717
use std::collections::HashMap;
1818

19-
use rocketmq_common::ArcRefCellWrapper;
19+
use rocketmq_rust::ArcMut;
2020
use rocketmq_store::base::message_arriving_listener::MessageArrivingListener;
2121
use rocketmq_store::log_file::MessageStore;
2222

2323
use crate::long_polling::long_polling_service::pull_request_hold_service::PullRequestHoldService;
2424

2525
pub struct NotifyMessageArrivingListener<MS> {
26-
pull_request_hold_service: ArcRefCellWrapper<PullRequestHoldService<MS>>,
26+
pull_request_hold_service: ArcMut<PullRequestHoldService<MS>>,
2727
}
2828

2929
impl<MS> NotifyMessageArrivingListener<MS>
3030
where
3131
MS: MessageStore + Send + Sync,
3232
{
33-
pub fn new(pull_request_hold_service: ArcRefCellWrapper<PullRequestHoldService<MS>>) -> Self {
33+
pub fn new(pull_request_hold_service: ArcMut<PullRequestHoldService<MS>>) -> Self {
3434
Self {
3535
pull_request_hold_service,
3636
}

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ use std::sync::Arc;
2626
use rocketmq_common::common::broker::broker_config::BrokerConfig;
2727
use rocketmq_common::common::config_manager::ConfigManager;
2828
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
29-
use rocketmq_common::ArcRefCellWrapper;
3029
use rocketmq_remoting::protocol::DataVersion;
3130
use rocketmq_remoting::protocol::RemotingSerializable;
31+
use rocketmq_rust::ArcMut;
3232
use rocketmq_store::log_file::MessageStore;
3333
use rocketmq_store::message_store::default_message_store::DefaultMessageStore;
3434
use serde::de;
@@ -49,18 +49,18 @@ pub const TOPIC_GROUP_SEPARATOR: &str = "@";
4949
pub(crate) struct ConsumerOffsetManager {
5050
pub(crate) broker_config: Arc<BrokerConfig>,
5151
consumer_offset_wrapper: ConsumerOffsetWrapper,
52-
message_store: Option<ArcRefCellWrapper<DefaultMessageStore>>,
52+
message_store: Option<ArcMut<DefaultMessageStore>>,
5353
}
5454

5555
impl ConsumerOffsetManager {
5656
pub fn new(
5757
broker_config: Arc<BrokerConfig>,
58-
message_store: Option<ArcRefCellWrapper<DefaultMessageStore>>,
58+
message_store: Option<ArcMut<DefaultMessageStore>>,
5959
) -> Self {
6060
ConsumerOffsetManager {
6161
broker_config,
6262
consumer_offset_wrapper: ConsumerOffsetWrapper {
63-
data_version: ArcRefCellWrapper::new(DataVersion::default()),
63+
data_version: ArcMut::new(DataVersion::default()),
6464
offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
6565
reset_offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
6666
pull_offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
@@ -69,10 +69,7 @@ impl ConsumerOffsetManager {
6969
message_store,
7070
}
7171
}
72-
pub fn set_message_store(
73-
&mut self,
74-
message_store: Option<ArcRefCellWrapper<DefaultMessageStore>>,
75-
) {
72+
pub fn set_message_store(&mut self, message_store: Option<ArcMut<DefaultMessageStore>>) {
7673
self.message_store = message_store;
7774
}
7875
}
@@ -273,7 +270,7 @@ impl ConsumerOffsetManager {
273270

274271
#[derive(Default, Clone)]
275272
struct ConsumerOffsetWrapper {
276-
data_version: ArcRefCellWrapper<DataVersion>,
273+
data_version: ArcMut<DataVersion>,
277274
offset_table: Arc<parking_lot::RwLock<HashMap<String /* topic@group */, HashMap<i32, i64>>>>,
278275
reset_offset_table: Arc<parking_lot::RwLock<HashMap<String, HashMap<i32, i64>>>>,
279276
pull_offset_table:
@@ -408,7 +405,7 @@ impl<'de> Deserialize<'de> for ConsumerOffsetWrapper {
408405
let pull_offset_table = pull_offset_table.unwrap_or_default();
409406

410407
Ok(ConsumerOffsetWrapper {
411-
data_version: ArcRefCellWrapper::new(data_version),
408+
data_version: ArcMut::new(data_version),
412409
offset_table: Arc::new(parking_lot::RwLock::new(offset_table)),
413410
reset_offset_table: Arc::new(parking_lot::RwLock::new(reset_offset_table)),
414411
pull_offset_table: Arc::new(parking_lot::RwLock::new(pull_offset_table)),

rocketmq-broker/src/out_api/broker_outer_api.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use rocketmq_common::common::config::TopicConfig;
2424
use rocketmq_common::common::message::message_queue::MessageQueue;
2525
use rocketmq_common::utils::crc32_utils;
2626
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
27-
use rocketmq_common::ArcRefCellWrapper;
2827
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
2928
use rocketmq_remoting::clients::RemotingClient;
3029
use rocketmq_remoting::code::request_code::RequestCode;
@@ -50,6 +49,7 @@ use rocketmq_remoting::rpc::client_metadata::ClientMetadata;
5049
use rocketmq_remoting::rpc::rpc_client_impl::RpcClientImpl;
5150
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
5251
use rocketmq_remoting::runtime::RPCHook;
52+
use rocketmq_rust::ArcMut;
5353
use tracing::debug;
5454
use tracing::error;
5555
use tracing::info;
@@ -59,15 +59,15 @@ use crate::error::BrokerError::BrokerClientError;
5959
use crate::Result;
6060

6161
pub struct BrokerOuterAPI {
62-
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
62+
remoting_client: ArcMut<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
6363
name_server_address: Option<String>,
6464
rpc_client: RpcClientImpl,
6565
client_metadata: ClientMetadata,
6666
}
6767

6868
impl BrokerOuterAPI {
6969
pub fn new(tokio_client_config: Arc<TokioClientConfig>) -> Self {
70-
let client = ArcRefCellWrapper::new(RocketmqDefaultClient::new(
70+
let client = ArcMut::new(RocketmqDefaultClient::new(
7171
tokio_client_config,
7272
DefaultRemotingRequestProcessor,
7373
));
@@ -84,7 +84,7 @@ impl BrokerOuterAPI {
8484
tokio_client_config: Arc<TokioClientConfig>,
8585
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
8686
) -> Self {
87-
let mut client = ArcRefCellWrapper::new(RocketmqDefaultClient::new(
87+
let mut client = ArcMut::new(RocketmqDefaultClient::new(
8888
tokio_client_config,
8989
DefaultRemotingRequestProcessor,
9090
));
@@ -123,7 +123,7 @@ impl BrokerOuterAPI {
123123

124124
impl BrokerOuterAPI {
125125
pub async fn start(&self) {
126-
let wrapper = ArcRefCellWrapper::downgrade(&self.remoting_client);
126+
let wrapper = ArcMut::downgrade(&self.remoting_client);
127127
self.remoting_client.start(wrapper).await;
128128
}
129129

rocketmq-broker/src/processor.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use rocketmq_common::ArcRefCellWrapper;
1817
use rocketmq_remoting::code::request_code::RequestCode;
1918
use rocketmq_remoting::net::channel::Channel;
2019
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
2120
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
2221
use rocketmq_remoting::runtime::processor::RequestProcessor;
2322
use rocketmq_remoting::Result;
23+
use rocketmq_rust::ArcMut;
2424
use rocketmq_store::log_file::MessageStore;
2525
use tracing::info;
2626

@@ -61,21 +61,21 @@ pub(crate) mod reply_message_processor;
6161
pub(crate) mod send_message_processor;
6262

6363
pub struct BrokerRequestProcessor<MS, TS> {
64-
pub(crate) send_message_processor: ArcRefCellWrapper<SendMessageProcessor<MS, TS>>,
65-
pub(crate) pull_message_processor: ArcRefCellWrapper<PullMessageProcessor<MS>>,
66-
pub(crate) peek_message_processor: ArcRefCellWrapper<PeekMessageProcessor>,
67-
pub(crate) pop_message_processor: ArcRefCellWrapper<PopMessageProcessor>,
68-
pub(crate) ack_message_processor: ArcRefCellWrapper<AckMessageProcessor>,
69-
pub(crate) change_invisible_time_processor: ArcRefCellWrapper<ChangeInvisibleTimeProcessor>,
70-
pub(crate) notification_processor: ArcRefCellWrapper<NotificationProcessor>,
71-
pub(crate) polling_info_processor: ArcRefCellWrapper<PollingInfoProcessor>,
72-
pub(crate) reply_message_processor: ArcRefCellWrapper<ReplyMessageProcessor<MS, TS>>,
73-
pub(crate) query_message_processor: ArcRefCellWrapper<QueryMessageProcessor<MS>>,
74-
pub(crate) client_manage_processor: ArcRefCellWrapper<ClientManageProcessor<MS>>,
75-
pub(crate) consumer_manage_processor: ArcRefCellWrapper<ConsumerManageProcessor<MS>>,
76-
pub(crate) query_assignment_processor: ArcRefCellWrapper<QueryAssignmentProcessor>,
77-
pub(crate) end_transaction_processor: ArcRefCellWrapper<EndTransactionProcessor>,
78-
pub(crate) admin_broker_processor: ArcRefCellWrapper<AdminBrokerProcessor>,
64+
pub(crate) send_message_processor: ArcMut<SendMessageProcessor<MS, TS>>,
65+
pub(crate) pull_message_processor: ArcMut<PullMessageProcessor<MS>>,
66+
pub(crate) peek_message_processor: ArcMut<PeekMessageProcessor>,
67+
pub(crate) pop_message_processor: ArcMut<PopMessageProcessor>,
68+
pub(crate) ack_message_processor: ArcMut<AckMessageProcessor>,
69+
pub(crate) change_invisible_time_processor: ArcMut<ChangeInvisibleTimeProcessor>,
70+
pub(crate) notification_processor: ArcMut<NotificationProcessor>,
71+
pub(crate) polling_info_processor: ArcMut<PollingInfoProcessor>,
72+
pub(crate) reply_message_processor: ArcMut<ReplyMessageProcessor<MS, TS>>,
73+
pub(crate) query_message_processor: ArcMut<QueryMessageProcessor<MS>>,
74+
pub(crate) client_manage_processor: ArcMut<ClientManageProcessor<MS>>,
75+
pub(crate) consumer_manage_processor: ArcMut<ConsumerManageProcessor<MS>>,
76+
pub(crate) query_assignment_processor: ArcMut<QueryAssignmentProcessor>,
77+
pub(crate) end_transaction_processor: ArcMut<EndTransactionProcessor>,
78+
pub(crate) admin_broker_processor: ArcMut<AdminBrokerProcessor>,
7979
}
8080
impl<MS, TS> Clone for BrokerRequestProcessor<MS, TS> {
8181
fn clone(&self) -> Self {

0 commit comments

Comments
 (0)