Skip to content

[ISSUE #1107]Replace ArcRefCellWrapper with ArcMut #1109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 15 additions & 16 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
use rocketmq_common::common::constant::PermName;
use rocketmq_common::common::server::config::ServerConfig;
use rocketmq_common::common::statistics::state_getter::StateGetter;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_common::UtilAll::compute_next_morning_time_millis;
use rocketmq_remoting::protocol::body::broker_body::broker_member_group::BrokerMemberGroup;
Expand All @@ -39,6 +38,7 @@
use rocketmq_remoting::remoting_server::server::RocketMQServer;
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
use rocketmq_runtime::RocketMQRuntime;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::store_enum::StoreType;
use rocketmq_store::config::message_store_config::MessageStoreConfig;
use rocketmq_store::log_file::MessageStore;
Expand Down Expand Up @@ -96,7 +96,7 @@
consumer_filter_manager: Arc<ConsumerFilterManager>,
consumer_order_info_manager: Arc<ConsumerOrderInfoManager>,
#[cfg(feature = "local_file_store")]
message_store: Option<ArcRefCellWrapper<DefaultMessageStore>>,
message_store: Option<ArcMut<DefaultMessageStore>>,
#[cfg(feature = "local_file_store")]
broker_stats: Option<Arc<BrokerStats<DefaultMessageStore>>>,
//message_store: Option<Arc<Mutex<LocalFileMessageStore>>>,
Expand All @@ -118,13 +118,12 @@
should_start_time: Arc<AtomicU64>,
is_isolated: Arc<AtomicBool>,
#[cfg(feature = "local_file_store")]
pull_request_hold_service:
Option<ArcRefCellWrapper<PullRequestHoldService<DefaultMessageStore>>>,
pull_request_hold_service: Option<ArcMut<PullRequestHoldService<DefaultMessageStore>>>,
rebalance_lock_manager: Arc<RebalanceLockManager>,
broker_member_group: Arc<BrokerMemberGroup>,
#[cfg(feature = "local_file_store")]
transactional_message_service:
Option<ArcRefCellWrapper<DefaultTransactionalMessageService<DefaultMessageStore>>>,
Option<ArcMut<DefaultTransactionalMessageService<DefaultMessageStore>>>,
transactional_message_check_listener: Option<Arc<DefaultTransactionalMessageCheckListener>>,
transactional_message_check_service: Option<Arc<TransactionalMessageCheckService>>,
transaction_metrics_flush_service: Option<Arc<TransactionMetricsFlushService>>,
Expand Down Expand Up @@ -335,7 +334,7 @@
async fn initialize_message_store(&mut self) -> bool {
if self.message_store_config.store_type == StoreType::LocalFile {
info!("Use local file as message store");
let mut message_store = ArcRefCellWrapper::new(DefaultMessageStore::new(
let mut message_store = ArcMut::new(DefaultMessageStore::new(

Check warning on line 337 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L337

Added line #L337 was not covered by tests
self.message_store_config.clone(),
self.broker_config.clone(),
self.topic_config_manager.topic_config_table(),
Expand Down Expand Up @@ -441,7 +440,7 @@
self.transactional_message_service.as_ref().unwrap().clone(),
);
let mut pull_message_result_handler =
ArcRefCellWrapper::new(Box::new(DefaultPullMessageResultHandler::new(
ArcMut::new(Box::new(DefaultPullMessageResultHandler::new(

Check warning on line 443 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L443

Added line #L443 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify initialization by removing unnecessary Box.

When wrapping DefaultPullMessageResultHandler with ArcMut, the intermediate Box is unnecessary. ArcMut can own the data directly. Removing Box simplifies the code.

Apply this diff to simplify the initialization:

-let pull_message_result_handler =
-    ArcMut::new(Box::new(DefaultPullMessageResultHandler::new(
+let pull_message_result_handler =
+    ArcMut::new(DefaultPullMessageResultHandler::new(
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ArcMut::new(Box::new(DefaultPullMessageResultHandler::new(
ArcMut::new(DefaultPullMessageResultHandler::new(

self.message_store_config.clone(),
Arc::new(self.topic_config_manager.clone()),
Arc::new(self.consumer_offset_manager.clone()),
Expand All @@ -452,7 +451,7 @@
Arc::new(Default::default()),
)) as Box<dyn PullMessageResultHandler>);
let message_store = self.message_store.clone().unwrap();
let pull_message_processor = ArcRefCellWrapper::new(PullMessageProcessor::new(
let pull_message_processor = ArcMut::new(PullMessageProcessor::new(

Check warning on line 454 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L454

Added line #L454 was not covered by tests
pull_message_result_handler.clone(),
self.broker_config.clone(),
self.subscription_group_manager.clone(),
Expand All @@ -475,7 +474,7 @@
Arc::new(self.topic_config_manager.clone()),
self.message_store.clone().unwrap(),
);
self.pull_request_hold_service = Some(ArcRefCellWrapper::new(PullRequestHoldService::new(
self.pull_request_hold_service = Some(ArcMut::new(PullRequestHoldService::new(

Check warning on line 477 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L477

Added line #L477 was not covered by tests
message_store.clone(),
pull_message_processor.clone(),
self.broker_config.clone(),
Expand Down Expand Up @@ -515,26 +514,26 @@
);

BrokerRequestProcessor {
send_message_processor: ArcRefCellWrapper::new(send_message_processor),
send_message_processor: ArcMut::new(send_message_processor),

Check warning on line 517 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L517

Added line #L517 was not covered by tests
pull_message_processor,
peek_message_processor: Default::default(),
pop_message_processor: Default::default(),
ack_message_processor: Default::default(),
change_invisible_time_processor: Default::default(),
notification_processor: Default::default(),
polling_info_processor: Default::default(),
reply_message_processor: ArcRefCellWrapper::new(reply_message_processor),
admin_broker_processor: ArcRefCellWrapper::new(admin_broker_processor),
client_manage_processor: ArcRefCellWrapper::new(ClientManageProcessor::new(
reply_message_processor: ArcMut::new(reply_message_processor),
admin_broker_processor: ArcMut::new(admin_broker_processor),
client_manage_processor: ArcMut::new(ClientManageProcessor::new(

Check warning on line 527 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L525-L527

Added lines #L525 - L527 were not covered by tests
self.broker_config.clone(),
self.producer_manager.clone(),
self.consumer_manager.clone(),
self.topic_config_manager.clone(),
self.subscription_group_manager.clone(),
)),
consumer_manage_processor: ArcRefCellWrapper::new(consumer_manage_processor),
consumer_manage_processor: ArcMut::new(consumer_manage_processor),

Check warning on line 534 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L534

Added line #L534 was not covered by tests
query_assignment_processor: Default::default(),
query_message_processor: ArcRefCellWrapper::new(query_message_processor),
query_message_processor: ArcMut::new(query_message_processor),

Check warning on line 536 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L536

Added line #L536 was not covered by tests
end_transaction_processor: Default::default(),
}
}
Expand Down Expand Up @@ -679,7 +678,7 @@
self.topic_config_manager.clone()
);
let service = DefaultTransactionalMessageService::new(bridge);
self.transactional_message_service = Some(ArcRefCellWrapper::new(service));
self.transactional_message_service = Some(ArcMut::new(service));

Check warning on line 681 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L681

Added line #L681 was not covered by tests
}
}
self.transactional_message_check_listener =
Expand Down
9 changes: 3 additions & 6 deletions rocketmq-broker/src/hook/check_before_put_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;

use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::message_result::PutMessageResult;
use rocketmq_store::config::message_store_config::MessageStoreConfig;
use rocketmq_store::hook::put_message_hook::PutMessageHook;
Expand All @@ -27,15 +27,12 @@
use crate::util::hook_utils::HookUtils;

pub struct CheckBeforePutMessageHook<MS> {
message_store: ArcRefCellWrapper<MS>,
message_store: ArcMut<MS>,
message_store_config: Arc<MessageStoreConfig>,
}

impl<MS: MessageStore> CheckBeforePutMessageHook<MS> {
pub fn new(
message_store: ArcRefCellWrapper<MS>,
message_store_config: Arc<MessageStoreConfig>,
) -> Self {
pub fn new(message_store: ArcMut<MS>, message_store_config: Arc<MessageStoreConfig>) -> Self {

Check warning on line 35 in rocketmq-broker/src/hook/check_before_put_message.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/hook/check_before_put_message.rs#L35

Added line #L35 was not covered by tests
Self {
message_store,
message_store_config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
use std::sync::Arc;

use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_rust::ArcMut;
use rocketmq_store::consume_queue::consume_queue_ext::CqExtUnit;
use rocketmq_store::log_file::MessageStore;
use tokio::sync::Notify;
Expand All @@ -36,8 +36,8 @@

pub struct PullRequestHoldService<MS> {
pull_request_table: Arc<parking_lot::RwLock<HashMap<String, ManyPullRequest>>>,
pull_message_processor: ArcRefCellWrapper<PullMessageProcessor<MS>>,
message_store: ArcRefCellWrapper<MS>,
pull_message_processor: ArcMut<PullMessageProcessor<MS>>,
message_store: ArcMut<MS>,
broker_config: Arc<BrokerConfig>,
shutdown: Arc<Notify>,
}
Expand All @@ -47,8 +47,8 @@
MS: MessageStore + Send + Sync,
{
pub fn new(
message_store: ArcRefCellWrapper<MS>,
pull_message_processor: ArcRefCellWrapper<PullMessageProcessor<MS>>,
message_store: ArcMut<MS>,
pull_message_processor: ArcMut<PullMessageProcessor<MS>>,
broker_config: Arc<BrokerConfig>,
) -> Self {
PullRequestHoldService {
Expand All @@ -66,7 +66,7 @@
where
MS: MessageStore + Send + Sync,
{
pub fn start(&mut self, this: ArcRefCellWrapper<Self>) {
pub fn start(&mut self, this: ArcMut<Self>) {

Check warning on line 69 in rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs#L69

Added line #L69 was not covered by tests
tokio::spawn(async move {
loop {
let handle_future = if this.broker_config.long_polling_enable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
*/
use std::collections::HashMap;

use rocketmq_common::ArcRefCellWrapper;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::message_arriving_listener::MessageArrivingListener;
use rocketmq_store::log_file::MessageStore;

use crate::long_polling::long_polling_service::pull_request_hold_service::PullRequestHoldService;

pub struct NotifyMessageArrivingListener<MS> {
pull_request_hold_service: ArcRefCellWrapper<PullRequestHoldService<MS>>,
pull_request_hold_service: ArcMut<PullRequestHoldService<MS>>,
}

impl<MS> NotifyMessageArrivingListener<MS>
where
MS: MessageStore + Send + Sync,
{
pub fn new(pull_request_hold_service: ArcRefCellWrapper<PullRequestHoldService<MS>>) -> Self {
pub fn new(pull_request_hold_service: ArcMut<PullRequestHoldService<MS>>) -> Self {

Check warning on line 33 in rocketmq-broker/src/long_polling/notify_message_arriving_listener.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/long_polling/notify_message_arriving_listener.rs#L33

Added line #L33 was not covered by tests
Self {
pull_request_hold_service,
}
Expand Down
17 changes: 7 additions & 10 deletions rocketmq-broker/src/offset/manager/consumer_offset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_remoting::protocol::DataVersion;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_rust::ArcMut;
use rocketmq_store::log_file::MessageStore;
use rocketmq_store::message_store::default_message_store::DefaultMessageStore;
use serde::de;
Expand All @@ -49,18 +49,18 @@
pub(crate) struct ConsumerOffsetManager {
pub(crate) broker_config: Arc<BrokerConfig>,
consumer_offset_wrapper: ConsumerOffsetWrapper,
message_store: Option<ArcRefCellWrapper<DefaultMessageStore>>,
message_store: Option<ArcMut<DefaultMessageStore>>,

Check warning on line 52 in rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs#L52

Added line #L52 was not covered by tests
}

impl ConsumerOffsetManager {
pub fn new(
broker_config: Arc<BrokerConfig>,
message_store: Option<ArcRefCellWrapper<DefaultMessageStore>>,
message_store: Option<ArcMut<DefaultMessageStore>>,
) -> Self {
ConsumerOffsetManager {
broker_config,
consumer_offset_wrapper: ConsumerOffsetWrapper {
data_version: ArcRefCellWrapper::new(DataVersion::default()),
data_version: ArcMut::new(DataVersion::default()),

Check warning on line 63 in rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs#L63

Added line #L63 was not covered by tests
offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
reset_offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
pull_offset_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
Expand All @@ -69,10 +69,7 @@
message_store,
}
}
pub fn set_message_store(
&mut self,
message_store: Option<ArcRefCellWrapper<DefaultMessageStore>>,
) {
pub fn set_message_store(&mut self, message_store: Option<ArcMut<DefaultMessageStore>>) {

Check warning on line 72 in rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs#L72

Added line #L72 was not covered by tests
self.message_store = message_store;
}
}
Expand Down Expand Up @@ -273,7 +270,7 @@

#[derive(Default, Clone)]
struct ConsumerOffsetWrapper {
data_version: ArcRefCellWrapper<DataVersion>,
data_version: ArcMut<DataVersion>,

Check warning on line 273 in rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs#L273

Added line #L273 was not covered by tests
offset_table: Arc<parking_lot::RwLock<HashMap<String /* topic@group */, HashMap<i32, i64>>>>,
reset_offset_table: Arc<parking_lot::RwLock<HashMap<String, HashMap<i32, i64>>>>,
pull_offset_table:
Expand Down Expand Up @@ -408,7 +405,7 @@
let pull_offset_table = pull_offset_table.unwrap_or_default();

Ok(ConsumerOffsetWrapper {
data_version: ArcRefCellWrapper::new(data_version),
data_version: ArcMut::new(data_version),

Check warning on line 408 in rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs#L408

Added line #L408 was not covered by tests
offset_table: Arc::new(parking_lot::RwLock::new(offset_table)),
reset_offset_table: Arc::new(parking_lot::RwLock::new(reset_offset_table)),
pull_offset_table: Arc::new(parking_lot::RwLock::new(pull_offset_table)),
Expand Down
10 changes: 5 additions & 5 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::utils::crc32_utils;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
use rocketmq_remoting::clients::RemotingClient;
use rocketmq_remoting::code::request_code::RequestCode;
Expand All @@ -50,6 +49,7 @@
use rocketmq_remoting::rpc::rpc_client_impl::RpcClientImpl;
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
use rocketmq_remoting::runtime::RPCHook;
use rocketmq_rust::ArcMut;
use tracing::debug;
use tracing::error;
use tracing::info;
Expand All @@ -59,15 +59,15 @@
use crate::Result;

pub struct BrokerOuterAPI {
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
remoting_client: ArcMut<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
name_server_address: Option<String>,
rpc_client: RpcClientImpl,
client_metadata: ClientMetadata,
}

impl BrokerOuterAPI {
pub fn new(tokio_client_config: Arc<TokioClientConfig>) -> Self {
let client = ArcRefCellWrapper::new(RocketmqDefaultClient::new(
let client = ArcMut::new(RocketmqDefaultClient::new(

Check warning on line 70 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L70

Added line #L70 was not covered by tests
tokio_client_config,
DefaultRemotingRequestProcessor,
));
Expand All @@ -84,7 +84,7 @@
tokio_client_config: Arc<TokioClientConfig>,
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
) -> Self {
let mut client = ArcRefCellWrapper::new(RocketmqDefaultClient::new(
let mut client = ArcMut::new(RocketmqDefaultClient::new(

Check warning on line 87 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L87

Added line #L87 was not covered by tests
tokio_client_config,
DefaultRemotingRequestProcessor,
));
Expand Down Expand Up @@ -123,7 +123,7 @@

impl BrokerOuterAPI {
pub async fn start(&self) {
let wrapper = ArcRefCellWrapper::downgrade(&self.remoting_client);
let wrapper = ArcMut::downgrade(&self.remoting_client);

Check warning on line 126 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L126

Added line #L126 was not covered by tests
self.remoting_client.start(wrapper).await;
}

Expand Down
32 changes: 16 additions & 16 deletions rocketmq-broker/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_remoting::runtime::processor::RequestProcessor;
use rocketmq_remoting::Result;
use rocketmq_rust::ArcMut;
use rocketmq_store::log_file::MessageStore;
use tracing::info;

Expand Down Expand Up @@ -61,21 +61,21 @@ pub(crate) mod reply_message_processor;
pub(crate) mod send_message_processor;

pub struct BrokerRequestProcessor<MS, TS> {
pub(crate) send_message_processor: ArcRefCellWrapper<SendMessageProcessor<MS, TS>>,
pub(crate) pull_message_processor: ArcRefCellWrapper<PullMessageProcessor<MS>>,
pub(crate) peek_message_processor: ArcRefCellWrapper<PeekMessageProcessor>,
pub(crate) pop_message_processor: ArcRefCellWrapper<PopMessageProcessor>,
pub(crate) ack_message_processor: ArcRefCellWrapper<AckMessageProcessor>,
pub(crate) change_invisible_time_processor: ArcRefCellWrapper<ChangeInvisibleTimeProcessor>,
pub(crate) notification_processor: ArcRefCellWrapper<NotificationProcessor>,
pub(crate) polling_info_processor: ArcRefCellWrapper<PollingInfoProcessor>,
pub(crate) reply_message_processor: ArcRefCellWrapper<ReplyMessageProcessor<MS, TS>>,
pub(crate) query_message_processor: ArcRefCellWrapper<QueryMessageProcessor<MS>>,
pub(crate) client_manage_processor: ArcRefCellWrapper<ClientManageProcessor<MS>>,
pub(crate) consumer_manage_processor: ArcRefCellWrapper<ConsumerManageProcessor<MS>>,
pub(crate) query_assignment_processor: ArcRefCellWrapper<QueryAssignmentProcessor>,
pub(crate) end_transaction_processor: ArcRefCellWrapper<EndTransactionProcessor>,
pub(crate) admin_broker_processor: ArcRefCellWrapper<AdminBrokerProcessor>,
pub(crate) send_message_processor: ArcMut<SendMessageProcessor<MS, TS>>,
pub(crate) pull_message_processor: ArcMut<PullMessageProcessor<MS>>,
pub(crate) peek_message_processor: ArcMut<PeekMessageProcessor>,
pub(crate) pop_message_processor: ArcMut<PopMessageProcessor>,
pub(crate) ack_message_processor: ArcMut<AckMessageProcessor>,
pub(crate) change_invisible_time_processor: ArcMut<ChangeInvisibleTimeProcessor>,
pub(crate) notification_processor: ArcMut<NotificationProcessor>,
pub(crate) polling_info_processor: ArcMut<PollingInfoProcessor>,
pub(crate) reply_message_processor: ArcMut<ReplyMessageProcessor<MS, TS>>,
pub(crate) query_message_processor: ArcMut<QueryMessageProcessor<MS>>,
pub(crate) client_manage_processor: ArcMut<ClientManageProcessor<MS>>,
pub(crate) consumer_manage_processor: ArcMut<ConsumerManageProcessor<MS>>,
pub(crate) query_assignment_processor: ArcMut<QueryAssignmentProcessor>,
pub(crate) end_transaction_processor: ArcMut<EndTransactionProcessor>,
pub(crate) admin_broker_processor: ArcMut<AdminBrokerProcessor>,
}
impl<MS, TS> Clone for BrokerRequestProcessor<MS, TS> {
fn clone(&self) -> Self {
Expand Down
Loading
Loading