-
Notifications
You must be signed in to change notification settings - Fork 161
[ISSUE #994]⚡️Optimize RocketmqDefaultClient code #995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -15,12 +15,14 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use std::sync::Arc; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use std::sync::Weak; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use dns_lookup::lookup_host; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rocketmq_common::common::broker::broker_config::BrokerIdentity; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rocketmq_common::common::config::TopicConfig; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rocketmq_common::utils::crc32_utils; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rocketmq_common::ArcRefCellWrapper; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rocketmq_remoting::clients::RemotingClient; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rocketmq_remoting::code::request_code::RequestCode; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -46,18 +48,19 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tracing::error; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tracing::info; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#[derive(Clone)] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub struct BrokerOuterAPI { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
remoting_client: RocketmqDefaultClient, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
name_server_address: Option<String>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
rpc_client: RpcClientImpl, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
client_metadata: ClientMetadata, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
impl BrokerOuterAPI { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub fn new(tokio_client_config: Arc<TokioClientConfig>) -> Self { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let client = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
RocketmqDefaultClient::new(tokio_client_config, DefaultRemotingRequestProcessor); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let client = ArcRefCellWrapper::new(RocketmqDefaultClient::new( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tokio_client_config, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DefaultRemotingRequestProcessor, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let client_metadata = ClientMetadata::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Self { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
remoting_client: client.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -71,8 +74,10 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tokio_client_config: Arc<TokioClientConfig>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
rpc_hook: Option<Arc<Box<dyn RPCHook>>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> Self { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut client = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
RocketmqDefaultClient::new(tokio_client_config, DefaultRemotingRequestProcessor); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut client = ArcRefCellWrapper::new(RocketmqDefaultClient::new( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tokio_client_config, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DefaultRemotingRequestProcessor, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let client_metadata = ClientMetadata::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Some(rpc_hook) = rpc_hook { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
client.register_rpc_hook(rpc_hook); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -108,7 +113,8 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
impl BrokerOuterAPI { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub async fn start(&self) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.remoting_client.start().await; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let wrapper = ArcRefCellWrapper::downgrade(&self.remoting_client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.remoting_client.start(wrapper).await; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub async fn update_name_server_address_list(&self, addrs: String) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -143,6 +149,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
compressed: bool, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
heartbeat_timeout_millis: Option<i64>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
_broker_identity: BrokerIdentity, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this: Weak<Self>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> Vec<RegisterBrokerResult> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let name_server_address_list = self.remoting_client.get_available_name_srv_list(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut register_broker_result_list = Vec::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -173,11 +180,21 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let cloned_body = body.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let cloned_header = request_header.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let addr = namesrv_addr.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let outer_api = self.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let outer_api = this.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let join_handle = tokio::spawn(async move { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
outer_api | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.register_broker(addr, oneway, timeout_mills, cloned_header, cloned_body) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Some(outer_api) = outer_api.upgrade() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
outer_api | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.register_broker( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
addr, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
oneway, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
timeout_mills, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cloned_header, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cloned_body, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+183
to
+197
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add Logging for Failed Weak Reference Upgrades In the asynchronous task within You can modify the code as follows to add an error log: if let Some(outer_api) = outer_api.upgrade() {
outer_api
.register_broker(
addr,
oneway,
timeout_mills,
cloned_header,
cloned_body,
)
.await
} else {
+ error!("Failed to upgrade Weak reference to BrokerOuterAPI. The instance may have been dropped.");
None
} Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/*let handle = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.register_broker(addr, oneway, timeout_mills, cloned_header, cloned_body);*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,7 +93,7 @@ | |
} | ||
|
||
pub struct MQClientAPIImpl { | ||
remoting_client: RocketmqDefaultClient<ClientRemotingProcessor>, | ||
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<ClientRemotingProcessor>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Potential thread-safety issue with The Recommendation:
Analysis chainPotential thread-safety issue with At line 96, the Run the following script to check if Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Check if `ArcRefCellWrapper` implements `Send` and `Sync`
# Search for implementations of `Send` and `Sync` for `ArcRefCellWrapper`
rg --type rust 'impl\s+Send\s+for\s+ArcRefCellWrapper' -A 1
rg --type rust 'impl\s+Sync\s+for\s+ArcRefCellWrapper' -A 1
Length of output: 4109 Script: #!/bin/bash
# Description: Locate the definition of `ArcRefCellWrapper` and check for `Send` and `Sync` trait implementations or bounds
# Find the file where `ArcRefCellWrapper` is defined
fd 'struct\s+ArcRefCellWrapper' --type f --exclude-dir=node_modules | head -n1 > arc_refcell_wrapper_file.txt
# If the file is found, search within it for `Send` and `Sync` implementations
FILE=$(cat arc_refcell_wrapper_file.txt)
if [ -n "$FILE" ]; then
echo "Searching for Send and Sync implementations in $FILE"
rg 'impl\s+Send\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
rg 'impl\s+Sync\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
# Additionally, check for trait bounds in the struct definition
rg 'struct\s+ArcRefCellWrapper.*where\s+[^{}]*\bSend\b[^{}]*\bSync\b' "$FILE"
else
echo "ArcRefCellWrapper definition not found."
fi
Length of output: 454 Script: #!/bin/bash
# Description: Locate the definition of `ArcRefCellWrapper` and check for `Send` and `Sync` trait implementations or bounds
# Find the file where `ArcRefCellWrapper` is defined, using the correct exclude flag
fd 'struct\s+ArcRefCellWrapper' --type f --exclude '*node_modules*' | head -n1 > arc_refcell_wrapper_file.txt
# If the file is found, search within it for `Send` and `Sync` implementations
FILE=$(cat arc_refcell_wrapper_file.txt)
if [ -n "$FILE" ]; then
echo "Searching for Send and Sync implementations in $FILE"
rg 'impl\s+Send\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
rg 'impl\s+Sync\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
# Additionally, check for trait bounds in the struct definition
rg 'struct\s+ArcRefCellWrapper.*where\s+[^{}]*\bSend\b[^{}]*\bSync\b' "$FILE"
else
echo "ArcRefCellWrapper definition not found. Listing all occurrences of ArcRefCellWrapper for manual inspection."
rg 'ArcRefCellWrapper' --type rust
fi
Length of output: 33991 Script: #!/bin/bash
# Description: Locate the definition of `ArcRefCellWrapper` in `rocketmq-common` and check for `Send` and `Sync` trait implementations or bounds
# Define the specific path to `rocketmq-common/src/lib.rs`
FILE="rocketmq-common/src/lib.rs"
# Check if the file exists
if [ -f "$FILE" ]; then
echo "Searching for Send and Sync implementations in $FILE"
rg 'impl\s+Send\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
rg 'impl\s+Sync\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
# Additionally, check for trait bounds in the struct definition
rg 'struct\s+ArcRefCellWrapper.*where\s+[^{}]*\bSend\b[^{}]*\bSync\b' "$FILE"
else
echo "$FILE not found. Please verify the path or ensure `rocketmq-common` is included in the search."
fi
Length of output: 483 |
||
top_addressing: Box<dyn TopAddressing>, | ||
// client_remoting_processor: ClientRemotingProcessor, | ||
name_srv_addr: Option<String>, | ||
|
@@ -121,7 +121,7 @@ | |
} | ||
|
||
MQClientAPIImpl { | ||
remoting_client: default_client, | ||
remoting_client: ArcRefCellWrapper::new(default_client), | ||
top_addressing: Box::new(DefaultTopAddressing::new( | ||
mix_all::get_ws_addr(), | ||
client_config.unit_name.clone(), | ||
|
@@ -133,7 +133,8 @@ | |
} | ||
|
||
pub async fn start(&self) { | ||
self.remoting_client.start().await; | ||
let client = ArcRefCellWrapper::downgrade(&self.remoting_client); | ||
self.remoting_client.start(client).await; | ||
} | ||
|
||
pub async fn fetch_name_server_addr(&mut self) -> Option<String> { | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -22,6 +22,7 @@ | |||||||||||||||||
|
||||||||||||||||||
use rand::Rng; | ||||||||||||||||||
use rocketmq_common::ArcRefCellWrapper; | ||||||||||||||||||
use rocketmq_common::WeakCellWrapper; | ||||||||||||||||||
use rocketmq_runtime::RocketMQRuntime; | ||||||||||||||||||
use tokio::sync::Mutex; | ||||||||||||||||||
use tokio::time; | ||||||||||||||||||
|
@@ -46,7 +47,6 @@ | |||||||||||||||||
|
||||||||||||||||||
pub type ArcSyncClient = Arc<Mutex<Client>>; | ||||||||||||||||||
|
||||||||||||||||||
#[derive(Clone)] | ||||||||||||||||||
pub struct RocketmqDefaultClient<PR = DefaultRemotingRequestProcessor> { | ||||||||||||||||||
tokio_client_config: Arc<TokioClientConfig>, | ||||||||||||||||||
//cache connection | ||||||||||||||||||
|
@@ -239,22 +239,16 @@ | |||||||||||||||||
|
||||||||||||||||||
#[allow(unused_variables)] | ||||||||||||||||||
impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingService for RocketmqDefaultClient<PR> { | ||||||||||||||||||
async fn start(&self) { | ||||||||||||||||||
let client = self.clone(); | ||||||||||||||||||
//invoke scan available name sever now | ||||||||||||||||||
client.scan_available_name_srv().await; | ||||||||||||||||||
/*let handle = task::spawn(async move { | ||||||||||||||||||
loop { | ||||||||||||||||||
time::sleep(Duration::from_millis(1)).await; | ||||||||||||||||||
client.scan_available_name_srv().await; | ||||||||||||||||||
} | ||||||||||||||||||
});*/ | ||||||||||||||||||
self.client_runtime.get_handle().spawn(async move { | ||||||||||||||||||
loop { | ||||||||||||||||||
time::sleep(Duration::from_millis(1)).await; | ||||||||||||||||||
client.scan_available_name_srv().await; | ||||||||||||||||||
} | ||||||||||||||||||
}); | ||||||||||||||||||
async fn start(&self, this: WeakCellWrapper<Self>) { | ||||||||||||||||||
if let Some(client) = this.upgrade() { | ||||||||||||||||||
client.scan_available_name_srv().await; | ||||||||||||||||||
self.client_runtime.get_handle().spawn(async move { | ||||||||||||||||||
loop { | ||||||||||||||||||
time::sleep(Duration::from_millis(1)).await; | ||||||||||||||||||
client.scan_available_name_srv().await; | ||||||||||||||||||
} | ||||||||||||||||||
Comment on lines
+246
to
+249
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Increase sleep duration in the loop to reduce CPU usage The loop in the spawned task sleeps for only 1 millisecond between iterations. This tight loop may lead to high CPU usage. Consider increasing the sleep duration to balance performance and resource consumption. Apply this diff to increase the sleep duration: loop {
- time::sleep(Duration::from_millis(1)).await;
+ time::sleep(Duration::from_secs(1)).await;
client.scan_available_name_srv().await;
} Committable suggestion
Suggested change
|
||||||||||||||||||
}); | ||||||||||||||||||
Comment on lines
+246
to
+250
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Implement graceful shutdown for the spawned task The spawned task runs an infinite loop without any exit condition or cancellation handling. To prevent potential resource leaks and enable proper shutdown of the service, consider implementing a mechanism to gracefully terminate the loop when the service is stopped. |
||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
fn shutdown(&mut self) { | ||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,8 @@ | |
*/ | ||
use std::sync::Arc; | ||
|
||
use rocketmq_common::WeakCellWrapper; | ||
|
||
use crate::base::response_future::ResponseFuture; | ||
use crate::protocol::remoting_command::RemotingCommand; | ||
use crate::runtime::RPCHook; | ||
|
@@ -36,7 +38,7 @@ pub trait RemotingService: Send { | |
/// This function should initialize and start the service, making it ready to handle incoming | ||
/// or outgoing remote procedure calls. The exact implementation details, such as opening | ||
/// network connections or preparing internal state, are left to the implementor. | ||
async fn start(&self); | ||
async fn start(&self, this: WeakCellWrapper<Self>); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Incomplete update of The following implementations of
Please update these implementations to match the new method signature to ensure consistency and prevent potential breaking changes. Analysis chainVerify impact of The modification to the
The use of To ensure this change doesn't introduce unintended consequences:
Run the following script to identify affected implementations: Please review the results and ensure all implementations are updated to match the new signature. Consider documenting this change in the crate's changelog and updating any relevant documentation to reflect the new parameter and its purpose. Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Find all implementations of RemotingService trait and their start methods
# Find RemotingService implementations
echo "RemotingService implementations:"
rg --type rust "impl.*RemotingService.*for" -g '!target/'
# Find start method implementations
echo "\nstart method implementations:"
rg --type rust "async fn start.*&self" -g '!target/'
Length of output: 788 |
||
|
||
/// Shuts down the remoting service. | ||
/// | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure Thread Safety with
ArcRefCellWrapper
UsageThe change to use
ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>
forremoting_client
introduces interior mutability viaRefCell
. In asynchronous and potentially multithreaded contexts,RefCell
is not thread-safe, which can lead to runtime borrow panics if mutable borrows overlap across threads. Consider usingArc<Mutex<T>>
orArc<RwLock<T>>
for thread-safe interior mutability, ensuring that access toremoting_client
is properly synchronized.