Skip to content

[ISSUE #994]⚡️Optimize RocketmqDefaultClient code #995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@
self.broker_config.broker_ip1, self.server_config.listen_port
);
let broker_id = self.broker_config.broker_identity.broker_id;
let weak = Arc::downgrade(&self.broker_out_api);

Check warning on line 898 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L898

Added line #L898 was not covered by tests
self.broker_out_api
.register_broker_all(
cluster_name,
Expand All @@ -910,6 +911,7 @@
false,
None,
Default::default(),
weak,

Check warning on line 914 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L914

Added line #L914 was not covered by tests
)
.await;
}
Expand Down Expand Up @@ -1012,6 +1014,7 @@
self.broker_config.broker_ip1, self.server_config.listen_port
);
let broker_id = self.broker_config.broker_identity.broker_id;
let weak = Arc::downgrade(&self.broker_out_api);

Check warning on line 1017 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1017

Added line #L1017 was not covered by tests
self.broker_out_api
.register_broker_all(
cluster_name,
Expand All @@ -1027,6 +1030,7 @@
false,
None,
Default::default(),
weak,

Check warning on line 1033 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1033

Added line #L1033 was not covered by tests
)
.await;
}
Expand Down
39 changes: 28 additions & 11 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* limitations under the License.
*/
use std::sync::Arc;
use std::sync::Weak;

use dns_lookup::lookup_host;
use rocketmq_common::common::broker::broker_config::BrokerIdentity;
use rocketmq_common::common::config::TopicConfig;
use rocketmq_common::utils::crc32_utils;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
use rocketmq_remoting::clients::RemotingClient;
use rocketmq_remoting::code::request_code::RequestCode;
Expand All @@ -46,18 +48,19 @@
use tracing::error;
use tracing::info;

#[derive(Clone)]
pub struct BrokerOuterAPI {
remoting_client: RocketmqDefaultClient,
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure Thread Safety with ArcRefCellWrapper Usage

The change to use ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>> for remoting_client introduces interior mutability via RefCell. In asynchronous and potentially multithreaded contexts, RefCell is not thread-safe, which can lead to runtime borrow panics if mutable borrows overlap across threads. Consider using Arc<Mutex<T>> or Arc<RwLock<T>> for thread-safe interior mutability, ensuring that access to remoting_client is properly synchronized.

name_server_address: Option<String>,
rpc_client: RpcClientImpl,
client_metadata: ClientMetadata,
}

impl BrokerOuterAPI {
pub fn new(tokio_client_config: Arc<TokioClientConfig>) -> Self {
let client =
RocketmqDefaultClient::new(tokio_client_config, DefaultRemotingRequestProcessor);
let client = ArcRefCellWrapper::new(RocketmqDefaultClient::new(

Check warning on line 60 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L60

Added line #L60 was not covered by tests
tokio_client_config,
DefaultRemotingRequestProcessor,
));

Check warning on line 63 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L63

Added line #L63 was not covered by tests
let client_metadata = ClientMetadata::new();
Self {
remoting_client: client.clone(),
Expand All @@ -71,8 +74,10 @@
tokio_client_config: Arc<TokioClientConfig>,
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
) -> Self {
let mut client =
RocketmqDefaultClient::new(tokio_client_config, DefaultRemotingRequestProcessor);
let mut client = ArcRefCellWrapper::new(RocketmqDefaultClient::new(

Check warning on line 77 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L77

Added line #L77 was not covered by tests
tokio_client_config,
DefaultRemotingRequestProcessor,
));

Check warning on line 80 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L80

Added line #L80 was not covered by tests
let client_metadata = ClientMetadata::new();
if let Some(rpc_hook) = rpc_hook {
client.register_rpc_hook(rpc_hook);
Expand Down Expand Up @@ -108,7 +113,8 @@

impl BrokerOuterAPI {
pub async fn start(&self) {
self.remoting_client.start().await;
let wrapper = ArcRefCellWrapper::downgrade(&self.remoting_client);
self.remoting_client.start(wrapper).await;

Check warning on line 117 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L116-L117

Added lines #L116 - L117 were not covered by tests
}

pub async fn update_name_server_address_list(&self, addrs: String) {
Expand Down Expand Up @@ -143,6 +149,7 @@
compressed: bool,
heartbeat_timeout_millis: Option<i64>,
_broker_identity: BrokerIdentity,
this: Weak<Self>,

Check warning on line 152 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L152

Added line #L152 was not covered by tests
) -> Vec<RegisterBrokerResult> {
let name_server_address_list = self.remoting_client.get_available_name_srv_list();
let mut register_broker_result_list = Vec::new();
Expand Down Expand Up @@ -173,11 +180,21 @@
let cloned_body = body.clone();
let cloned_header = request_header.clone();
let addr = namesrv_addr.clone();
let outer_api = self.clone();
let outer_api = this.clone();

Check warning on line 183 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L183

Added line #L183 was not covered by tests
let join_handle = tokio::spawn(async move {
outer_api
.register_broker(addr, oneway, timeout_mills, cloned_header, cloned_body)
.await
if let Some(outer_api) = outer_api.upgrade() {
outer_api

Check warning on line 186 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L185-L186

Added lines #L185 - L186 were not covered by tests
.register_broker(
addr,
oneway,
timeout_mills,
cloned_header,
cloned_body,

Check warning on line 192 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L188-L192

Added lines #L188 - L192 were not covered by tests
)
.await
} else {
None

Check warning on line 196 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L194-L196

Added lines #L194 - L196 were not covered by tests
}
Comment on lines +183 to +197
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add Logging for Failed Weak Reference Upgrades

In the asynchronous task within register_broker_all, if outer_api.upgrade() returns None, the code currently returns None silently. For better debuggability, consider adding a log message to indicate that the BrokerOuterAPI instance has been dropped and the weak reference could not be upgraded.

You can modify the code as follows to add an error log:

 if let Some(outer_api) = outer_api.upgrade() {
     outer_api
         .register_broker(
             addr,
             oneway,
             timeout_mills,
             cloned_header,
             cloned_body,
         )
         .await
 } else {
+    error!("Failed to upgrade Weak reference to BrokerOuterAPI. The instance may have been dropped.");
     None
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let outer_api = this.clone();
let join_handle = tokio::spawn(async move {
outer_api
.register_broker(addr, oneway, timeout_mills, cloned_header, cloned_body)
.await
if let Some(outer_api) = outer_api.upgrade() {
outer_api
.register_broker(
addr,
oneway,
timeout_mills,
cloned_header,
cloned_body,
)
.await
} else {
None
}
let outer_api = this.clone();
let join_handle = tokio::spawn(async move {
if let Some(outer_api) = outer_api.upgrade() {
outer_api
.register_broker(
addr,
oneway,
timeout_mills,
cloned_header,
cloned_body,
)
.await
} else {
error!("Failed to upgrade Weak reference to BrokerOuterAPI. The instance may have been dropped.");
None
}

});
/*let handle =
self.register_broker(addr, oneway, timeout_mills, cloned_header, cloned_body);*/
Expand Down
7 changes: 4 additions & 3 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
}

pub struct MQClientAPIImpl {
remoting_client: RocketmqDefaultClient<ClientRemotingProcessor>,
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<ClientRemotingProcessor>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Potential thread-safety issue with ArcRefCellWrapper

The ArcRefCellWrapper does not implement the Send and Sync traits. Since RefCell is inherently not thread-safe, wrapping it within an Arc using ArcRefCellWrapper does not ensure thread safety across multiple threads. This can lead to runtime borrow panics if MQClientAPIImpl is accessed concurrently.

Recommendation:

  • Replace ArcRefCellWrapper with Arc<Mutex<_>> or Arc<RwLock<_>> to provide thread-safe interior mutability.
Analysis chain

Potential thread-safety issue with ArcRefCellWrapper

At line 96, the remoting_client field now uses ArcRefCellWrapper<RocketmqDefaultClient<ClientRemotingProcessor>>. The use of RefCell inside an Arc is not thread-safe because RefCell is not Sync or Send. If MQClientAPIImpl is accessed from multiple threads, this could lead to runtime borrow panics. Consider using Arc<Mutex<_>> or Arc<RwLock<_>> for thread-safe interior mutability.

Run the following script to check if ArcRefCellWrapper implements the Send and Sync traits:

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if `ArcRefCellWrapper` implements `Send` and `Sync`

# Search for implementations of `Send` and `Sync` for `ArcRefCellWrapper`
rg --type rust 'impl\s+Send\s+for\s+ArcRefCellWrapper' -A 1
rg --type rust 'impl\s+Sync\s+for\s+ArcRefCellWrapper' -A 1

Length of output: 4109


Script:

#!/bin/bash
# Description: Locate the definition of `ArcRefCellWrapper` and check for `Send` and `Sync` trait implementations or bounds

# Find the file where `ArcRefCellWrapper` is defined
fd 'struct\s+ArcRefCellWrapper' --type f --exclude-dir=node_modules | head -n1 > arc_refcell_wrapper_file.txt

# If the file is found, search within it for `Send` and `Sync` implementations
FILE=$(cat arc_refcell_wrapper_file.txt)
if [ -n "$FILE" ]; then
    echo "Searching for Send and Sync implementations in $FILE"
    rg 'impl\s+Send\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
    rg 'impl\s+Sync\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
    # Additionally, check for trait bounds in the struct definition
    rg 'struct\s+ArcRefCellWrapper.*where\s+[^{}]*\bSend\b[^{}]*\bSync\b' "$FILE"
else
    echo "ArcRefCellWrapper definition not found."
fi

Length of output: 454


Script:

#!/bin/bash
# Description: Locate the definition of `ArcRefCellWrapper` and check for `Send` and `Sync` trait implementations or bounds

# Find the file where `ArcRefCellWrapper` is defined, using the correct exclude flag
fd 'struct\s+ArcRefCellWrapper' --type f --exclude '*node_modules*' | head -n1 > arc_refcell_wrapper_file.txt

# If the file is found, search within it for `Send` and `Sync` implementations
FILE=$(cat arc_refcell_wrapper_file.txt)
if [ -n "$FILE" ]; then
    echo "Searching for Send and Sync implementations in $FILE"
    rg 'impl\s+Send\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
    rg 'impl\s+Sync\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
    # Additionally, check for trait bounds in the struct definition
    rg 'struct\s+ArcRefCellWrapper.*where\s+[^{}]*\bSend\b[^{}]*\bSync\b' "$FILE"
else
    echo "ArcRefCellWrapper definition not found. Listing all occurrences of ArcRefCellWrapper for manual inspection."
    rg 'ArcRefCellWrapper' --type rust
fi

Length of output: 33991


Script:

#!/bin/bash
# Description: Locate the definition of `ArcRefCellWrapper` in `rocketmq-common` and check for `Send` and `Sync` trait implementations or bounds

# Define the specific path to `rocketmq-common/src/lib.rs`
FILE="rocketmq-common/src/lib.rs"

# Check if the file exists
if [ -f "$FILE" ]; then
    echo "Searching for Send and Sync implementations in $FILE"
    rg 'impl\s+Send\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
    rg 'impl\s+Sync\s+for\s+ArcRefCellWrapper' "$FILE" -A 2
    # Additionally, check for trait bounds in the struct definition
    rg 'struct\s+ArcRefCellWrapper.*where\s+[^{}]*\bSend\b[^{}]*\bSync\b' "$FILE"
else
    echo "$FILE not found. Please verify the path or ensure `rocketmq-common` is included in the search."
fi

Length of output: 483

top_addressing: Box<dyn TopAddressing>,
// client_remoting_processor: ClientRemotingProcessor,
name_srv_addr: Option<String>,
Expand Down Expand Up @@ -121,7 +121,7 @@
}

MQClientAPIImpl {
remoting_client: default_client,
remoting_client: ArcRefCellWrapper::new(default_client),

Check warning on line 124 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L124

Added line #L124 was not covered by tests
top_addressing: Box::new(DefaultTopAddressing::new(
mix_all::get_ws_addr(),
client_config.unit_name.clone(),
Expand All @@ -133,7 +133,8 @@
}

pub async fn start(&self) {
self.remoting_client.start().await;
let client = ArcRefCellWrapper::downgrade(&self.remoting_client);
self.remoting_client.start(client).await;
}

pub async fn fetch_name_server_addr(&mut self) -> Option<String> {
Expand Down
9 changes: 5 additions & 4 deletions rocketmq-namesrv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
use rocketmq_common::common::server::config::ServerConfig;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
use rocketmq_remoting::remoting_server::server::RocketMQServer;
use rocketmq_remoting::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
Expand Down Expand Up @@ -49,7 +50,7 @@
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
name_server_runtime: Option<RocketMQRuntime>,
remoting_client: RocketmqDefaultClient,
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient>,
}

impl NameServerBootstrap {
Expand Down Expand Up @@ -142,10 +143,10 @@
let name_server_config = Arc::new(self.name_server_config.unwrap());
let runtime = RocketMQRuntime::new_multi(10, "namesrv-thread");
let tokio_client_config = Arc::new(TokioClientConfig::default());
let remoting_client = RocketmqDefaultClient::new(
let remoting_client = ArcRefCellWrapper::new(RocketmqDefaultClient::new(

Check warning on line 146 in rocketmq-namesrv/src/bootstrap.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/bootstrap.rs#L146

Added line #L146 was not covered by tests
tokio_client_config.clone(),
DefaultRemotingRequestProcessor,
);
));

NameServerBootstrap {
name_server_runtime: NameServerRuntime {
Expand All @@ -154,7 +155,7 @@
server_config: Arc::new(self.server_config.unwrap()),
route_info_manager: Arc::new(parking_lot::RwLock::new(RouteInfoManager::new(
name_server_config.clone(),
Arc::new(remoting_client.clone()),
remoting_client.clone(),

Check warning on line 158 in rocketmq-namesrv/src/bootstrap.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/bootstrap.rs#L158

Added line #L158 was not covered by tests
))),
kvconfig_manager: Arc::new(parking_lot::RwLock::new(KVConfigManager::new(
name_server_config,
Expand Down
5 changes: 3 additions & 2 deletions rocketmq-namesrv/src/route/route_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use rocketmq_common::common::mix_all;
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
use rocketmq_common::common::topic::TopicValidator;
use rocketmq_common::common::TopicSysFlag;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::TimeUtils;
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
use rocketmq_remoting::clients::RemotingClient;
Expand Down Expand Up @@ -73,14 +74,14 @@ pub struct RouteInfoManager {
pub(crate) filter_server_table: FilterServerTable,
pub(crate) topic_queue_mapping_info_table: TopicQueueMappingInfoTable,
pub(crate) namesrv_config: Arc<NamesrvConfig>,
pub(crate) remoting_client: Arc<RocketmqDefaultClient>,
pub(crate) remoting_client: ArcRefCellWrapper<RocketmqDefaultClient>,
}

#[allow(private_interfaces)]
impl RouteInfoManager {
pub fn new(
namesrv_config: Arc<NamesrvConfig>,
remoting_client: Arc<RocketmqDefaultClient>,
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient>,
) -> Self {
RouteInfoManager {
topic_queue_table: HashMap::new(),
Expand Down
28 changes: 11 additions & 17 deletions rocketmq-remoting/src/clients/rocketmq_default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use rand::Rng;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::WeakCellWrapper;
use rocketmq_runtime::RocketMQRuntime;
use tokio::sync::Mutex;
use tokio::time;
Expand All @@ -46,7 +47,6 @@

pub type ArcSyncClient = Arc<Mutex<Client>>;

#[derive(Clone)]
pub struct RocketmqDefaultClient<PR = DefaultRemotingRequestProcessor> {
tokio_client_config: Arc<TokioClientConfig>,
//cache connection
Expand Down Expand Up @@ -239,22 +239,16 @@

#[allow(unused_variables)]
impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingService for RocketmqDefaultClient<PR> {
async fn start(&self) {
let client = self.clone();
//invoke scan available name sever now
client.scan_available_name_srv().await;
/*let handle = task::spawn(async move {
loop {
time::sleep(Duration::from_millis(1)).await;
client.scan_available_name_srv().await;
}
});*/
self.client_runtime.get_handle().spawn(async move {
loop {
time::sleep(Duration::from_millis(1)).await;
client.scan_available_name_srv().await;
}
});
async fn start(&self, this: WeakCellWrapper<Self>) {
if let Some(client) = this.upgrade() {
client.scan_available_name_srv().await;
self.client_runtime.get_handle().spawn(async move {
loop {
time::sleep(Duration::from_millis(1)).await;
client.scan_available_name_srv().await;

Check warning on line 248 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L242-L248

Added lines #L242 - L248 were not covered by tests
}
Comment on lines +246 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Increase sleep duration in the loop to reduce CPU usage

The loop in the spawned task sleeps for only 1 millisecond between iterations. This tight loop may lead to high CPU usage. Consider increasing the sleep duration to balance performance and resource consumption.

Apply this diff to increase the sleep duration:

 loop {
-    time::sleep(Duration::from_millis(1)).await;
+    time::sleep(Duration::from_secs(1)).await;
     client.scan_available_name_srv().await;
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
loop {
time::sleep(Duration::from_millis(1)).await;
client.scan_available_name_srv().await;
}
loop {
time::sleep(Duration::from_secs(1)).await;
client.scan_available_name_srv().await;
}

});
Comment on lines +246 to +250
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Implement graceful shutdown for the spawned task

The spawned task runs an infinite loop without any exit condition or cancellation handling. To prevent potential resource leaks and enable proper shutdown of the service, consider implementing a mechanism to gracefully terminate the loop when the service is stopped.

}

Check warning on line 251 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L250-L251

Added lines #L250 - L251 were not covered by tests
}

fn shutdown(&mut self) {
Expand Down
4 changes: 3 additions & 1 deletion rocketmq-remoting/src/remoting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
use std::sync::Arc;

use rocketmq_common::WeakCellWrapper;

use crate::base::response_future::ResponseFuture;
use crate::protocol::remoting_command::RemotingCommand;
use crate::runtime::RPCHook;
Expand All @@ -36,7 +38,7 @@ pub trait RemotingService: Send {
/// This function should initialize and start the service, making it ready to handle incoming
/// or outgoing remote procedure calls. The exact implementation details, such as opening
/// network connections or preparing internal state, are left to the implementor.
async fn start(&self);
async fn start(&self, this: WeakCellWrapper<Self>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Incomplete update of start method signature across implementations.

The following implementations of RemotingService still have the old start method signature without the this: WeakCellWrapper<Self> parameter:

  • rocketmq-client/src/implementation/mq_client_api_impl.rs: pub async fn start(&self)
  • rocketmq-broker/src/out_api/broker_outer_api.rs: pub async fn start(&self)

Please update these implementations to match the new method signature to ensure consistency and prevent potential breaking changes.

Analysis chain

Verify impact of start method signature change.

The modification to the start method signature in the RemotingService trait is significant:

  1. It introduces a new parameter this: WeakCellWrapper<Self>, which changes how the service manages its lifecycle or self-referencing.
  2. This change will affect all implementations of the RemotingService trait and may potentially break existing code if not handled carefully.

The use of WeakCellWrapper suggests an attempt to avoid strong reference cycles, which is a good practice for memory management.

To ensure this change doesn't introduce unintended consequences:

  1. Verify all implementations of RemotingService are updated accordingly.
  2. Check for any potential breaking changes in dependent code.

Run the following script to identify affected implementations:

Please review the results and ensure all implementations are updated to match the new signature.

Consider documenting this change in the crate's changelog and updating any relevant documentation to reflect the new parameter and its purpose.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all implementations of RemotingService trait and their start methods

# Find RemotingService implementations
echo "RemotingService implementations:"
rg --type rust "impl.*RemotingService.*for" -g '!target/'

# Find start method implementations
echo "\nstart method implementations:"
rg --type rust "async fn start.*&self" -g '!target/'

Length of output: 788


/// Shuts down the remoting service.
///
Expand Down
10 changes: 7 additions & 3 deletions rocketmq-remoting/src/rpc/rpc_client_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::any::Any;

use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::ArcRefCellWrapper;

use crate::clients::rocketmq_default_impl::RocketmqDefaultClient;
use crate::clients::RemotingClient;
Expand All @@ -32,6 +33,7 @@
use crate::protocol::header::query_consumer_offset_response_header::QueryConsumerOffsetResponseHeader;
use crate::protocol::header::search_offset_response_header::SearchOffsetResponseHeader;
use crate::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetResponseHeader;
use crate::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
use crate::rpc::client_metadata::ClientMetadata;
use crate::rpc::rpc_client::RpcClient;
use crate::rpc::rpc_client_hook::RpcClientHookFn;
Expand All @@ -40,15 +42,17 @@
use crate::rpc::rpc_response::RpcResponse;
use crate::Result;

#[derive(Clone)]
pub struct RpcClientImpl {
client_metadata: ClientMetadata,
remoting_client: RocketmqDefaultClient,
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
client_hook_list: Vec<RpcClientHookFn>,
}

impl RpcClientImpl {
pub fn new(client_metadata: ClientMetadata, remoting_client: RocketmqDefaultClient) -> Self {
pub fn new(

Check warning on line 52 in rocketmq-remoting/src/rpc/rpc_client_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/rpc/rpc_client_impl.rs#L52

Added line #L52 was not covered by tests
client_metadata: ClientMetadata,
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
) -> Self {
RpcClientImpl {
client_metadata,
remoting_client,
Expand Down
Loading