Skip to content

[ISSUE #986]🚀Add Runtime for ConsumeMessageConcurrentlyService⚡️ #987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions rocketmq-client/examples/quickstart/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use rocketmq_client::consumer::mq_push_consumer::MQPushConsumer;
use rocketmq_client::Result;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_rust::rocketmq;
use tracing::info;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM: Improved logging mechanism

Changing from println! to info! macro from the tracing crate is a significant improvement. This provides structured logging capabilities, which are beneficial for log management and analysis. The tracing crate is a modern, flexible logging framework for Rust, suitable for asynchronous contexts.

Consider adding more context to the log message. For example:

info!("Received message: topic={}, tags={}, keys={}, body={:?}", msg.get_topic(), msg.get_tags(), msg.get_keys(), msg.get_body());

This would provide more detailed information in the logs, making debugging and monitoring easier.

Also applies to: 61-61


pub const MESSAGE_COUNT: usize = 1;
pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_4";
Expand All @@ -32,7 +33,7 @@ pub const TAG: &str = "*";
#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
//rocketmq_common::log::init_logger();
rocketmq_common::log::init_logger();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Potential Panic in Logger Initialization

The init_logger() function currently uses .unwrap() when parsing the log level, which can cause the application to panic if an invalid RUST_LOG value is provided. It's recommended to handle this scenario gracefully to improve the application's robustness.

  • File: rocketmq-common/src/log.rs
    • Line with .unwrap():
      .with_max_level(LevelFilter::from_str(info_level.as_str()).unwrap())
Analysis chain

LGTM: Logger initialization activated

Activating the logger initialization is a good practice for proper logging in the application. This change aligns well with the overall improvement in the logging mechanism.

To ensure the init_logger() function is properly implemented, please run the following script:

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the implementation of init_logger() function

# Test: Search for the init_logger function definition
rg --type rust -A 5 'fn init_logger'

Length of output: 424


Script:

#!/bin/bash
# Description: Retrieve the full implementation of the init_logger() function

rg --type rust 'fn init_logger' -A 10 rocketmq-common/src/log.rs

Length of output: 417


Script:

#!/bin/bash
# Description: Find all usages of the init_logger() function to verify correct invocation

rg --type rust 'init_logger\(\)' 

Length of output: 927


// create a producer builder with default configuration
let builder = DefaultMQPushConsumer::builder();
Expand All @@ -57,7 +58,7 @@ impl MessageListenerConcurrently for MyMessageListener {
_context: &ConsumeConcurrentlyContext,
) -> Result<ConsumeConcurrentlyStatus> {
for msg in msgs {
println!("Receive message: {:?}", msg);
info!("Receive message: {:?}", msg);
}
Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
use rocketmq_runtime::RocketMQRuntime;
use tracing::info;
use tracing::warn;

Expand All @@ -45,14 +46,13 @@
use crate::consumer::listener::message_listener_concurrently::ArcBoxMessageListenerConcurrently;
use crate::hook::consume_message_context::ConsumeMessageContext;

#[derive(Clone)]
pub struct ConsumeMessageConcurrentlyService {
pub(crate) default_mqpush_consumer_impl: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>,
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
pub(crate) consumer_group: Arc<String>,
pub(crate) message_listener: ArcBoxMessageListenerConcurrently,
// pub(crate) consume_runtime: Arc<RocketMQRuntime>,
pub(crate) consume_runtime: RocketMQRuntime,
}

impl ConsumeMessageConcurrentlyService {
Expand All @@ -64,27 +64,46 @@
default_mqpush_consumer_impl: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>,
) -> Self {
let consume_thread = consumer_config.consume_thread_max;
let consumer_group_tag = format!("{}_{}", "ConsumeMessageThread_", consumer_group);

Check warning on line 67 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L67

Added line #L67 was not covered by tests
Self {
default_mqpush_consumer_impl,
client_config,
consumer_config,
consumer_group: Arc::new(consumer_group),
message_listener,
/*consume_runtime: Arc::new(RocketMQRuntime::new_multi(
consume_runtime: RocketMQRuntime::new_multi(

Check warning on line 74 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L74

Added line #L74 was not covered by tests
consume_thread as usize,
"ConsumeMessageThread_",
)),*/
consumer_group_tag.as_str(),

Check warning on line 76 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L76

Added line #L76 was not covered by tests
),
}
}
}

impl ConsumeMessageConcurrentlyService {
async fn clean_expire_msg(&mut self) {
println!("===========================")
if let Some(default_mqpush_consumer_impl) = self

Check warning on line 84 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L84

Added line #L84 was not covered by tests
.default_mqpush_consumer_impl
.as_ref()
.unwrap()
.upgrade()

Check warning on line 88 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L88

Added line #L88 was not covered by tests
{
let process_queue_table = default_mqpush_consumer_impl

Check warning on line 90 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L90

Added line #L90 was not covered by tests
.rebalance_impl
.rebalance_impl_inner
.process_queue_table
.read()
.await;
for (_, process_queue) in process_queue_table.iter() {
process_queue
.clean_expired_msg(self.default_mqpush_consumer_impl.clone())
.await;

Check warning on line 99 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L95-L99

Added lines #L95 - L99 were not covered by tests
}
}

Check warning on line 101 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L101

Added line #L101 was not covered by tests
Comment on lines +84 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential None case when upgrading weak reference

In the clean_expire_msg method, when upgrading the default_mqpush_consumer_impl, there is an assumption that the upgrade will succeed. However, if the upgrade fails, it might cause a panic.

Consider handling the None case to prevent potential runtime errors:

if let Some(default_mqpush_consumer_impl) = self
    .default_mqpush_consumer_impl
    .as_ref()
    .unwrap()
    .upgrade()
{
    // existing code
} else {
    warn!("Failed to upgrade default_mqpush_consumer_impl; it may have been dropped");
}

Committable suggestion was skipped due to low confidence.

}

async fn process_consume_result(
&mut self,
this: ArcRefCellWrapper<Self>,

Check warning on line 106 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L106

Added line #L106 was not covered by tests
status: ConsumeConcurrentlyStatus,
context: &ConsumeConcurrentlyContext,
consume_request: &mut ConsumeRequest,
Expand Down Expand Up @@ -141,6 +160,7 @@
consume_request.msgs.append(&mut msg_back_success);
self.submit_consume_request_later(
msg_back_failed,
this,

Check warning on line 163 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L163

Added line #L163 was not covered by tests
consume_request.process_queue.clone(),
consume_request.message_queue.clone(),
);
Expand Down Expand Up @@ -171,13 +191,14 @@
fn submit_consume_request_later(
&self,
msgs: Vec<ArcRefCellWrapper<MessageClientExt>>,
this: ArcRefCellWrapper<Self>,
process_queue: Arc<ProcessQueue>,
message_queue: MessageQueue,
) {
let this = self.clone();
tokio::spawn(async move {
self.consume_runtime.get_handle().spawn(async move {

Check warning on line 198 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L198

Added line #L198 was not covered by tests
tokio::time::sleep(Duration::from_secs(5)).await;
this.submit_consume_request(msgs, process_queue, message_queue, true)
let this_ = this.clone();
this.submit_consume_request(this_, msgs, process_queue, message_queue, true)

Check warning on line 201 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L200-L201

Added lines #L200 - L201 were not covered by tests
.await;
});
}
Expand Down Expand Up @@ -211,9 +232,8 @@
}

impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
fn start(&mut self) {
let mut this = self.clone();
tokio::spawn(async move {
fn start(&mut self, mut this: ArcRefCellWrapper<Self>) {
self.consume_runtime.get_handle().spawn(async move {

Check warning on line 236 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L235-L236

Added lines #L235 - L236 were not covered by tests
let timeout = this.consumer_config.consume_timeout;
let mut interval = tokio::time::interval(Duration::from_secs(timeout * 60));
interval.tick().await;
Expand Down Expand Up @@ -254,6 +274,7 @@

async fn submit_consume_request(
&self,
this: ArcRefCellWrapper<Self>,

Check warning on line 277 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L277

Added line #L277 was not covered by tests
mut msgs: Vec<ArcRefCellWrapper<MessageClientExt>>,
process_queue: Arc<ProcessQueue>,
message_queue: MessageQueue,
Expand All @@ -270,13 +291,10 @@
consumer_group: self.consumer_group.as_ref().clone(),
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(),
};
let consume_message_concurrently_service = self.clone();

tokio::spawn(async move {
consume_request
.run(consume_message_concurrently_service)
.await
});
self.consume_runtime

Check warning on line 295 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L295

Added line #L295 was not covered by tests
.get_handle()
.spawn(async move { consume_request.run(this).await });

Check warning on line 297 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L297

Added line #L297 was not covered by tests
} else {
loop {
let item = if msgs.len() > consume_batch_size as usize {
Expand All @@ -294,13 +312,8 @@
consumer_group: self.consumer_group.as_ref().clone(),
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(),
};
let consume_message_concurrently_service = self.clone();
/* self.consume_runtime.get_handle().spawn(async move {
consume_request
.run(consume_message_concurrently_service)
.await
});*/
tokio::spawn(async move {
let consume_message_concurrently_service = this.clone();
self.consume_runtime.get_handle().spawn(async move {

Check warning on line 316 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L315-L316

Added lines #L315 - L316 were not covered by tests
consume_request
.run(consume_message_concurrently_service)
.await
Expand Down Expand Up @@ -335,7 +348,9 @@
impl ConsumeRequest {
async fn run(
&mut self,
mut consume_message_concurrently_service: ConsumeMessageConcurrentlyService,
mut consume_message_concurrently_service: ArcRefCellWrapper<

Check warning on line 351 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L351

Added line #L351 was not covered by tests
ConsumeMessageConcurrentlyService,
>,
) {
if self.process_queue.is_dropped() {
info!(
Expand Down Expand Up @@ -456,8 +471,9 @@
self.consumer_group, self.message_queue,
);
} else {
let this = consume_message_concurrently_service.clone();

Check warning on line 474 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L474

Added line #L474 was not covered by tests
consume_message_concurrently_service
.process_consume_result(status.unwrap(), &context, self)
.process_consume_result(this, status.unwrap(), &context, self)

Check warning on line 476 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L476

Added line #L476 was not covered by tests
.await;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
pub struct ConsumeMessageOrderlyService;

impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService {
fn start(&mut self) {
fn start(&mut self, this: ArcRefCellWrapper<Self>) {

Check warning on line 32 in rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L32

Added line #L32 was not covered by tests
todo!()
}
Comment on lines +32 to 34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Implement the start Method Across All Services

The start method is currently unimplemented in multiple services, which can lead to initialization issues. Please implement the start method in the following files:

  • rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs
  • rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs
  • rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs
  • rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs

Ensure each implementation initializes necessary resources, sets up background tasks, and includes appropriate startup logic.

Analysis chain

Implement the start method

The start method is crucial for initializing the service, but it's currently not implemented. Please provide an implementation for this method.

Consider the following steps for implementation:

  1. Initialize any necessary resources or state.
  2. Set up any required background tasks or threads.
  3. Implement any startup logic specific to ConsumeMessageOrderlyService.

Example skeleton:

fn start(&mut self, this: ArcRefCellWrapper<Self>) {
    // Initialize resources
    
    // Set up background tasks
    
    // Implement startup logic
    
    log::info!("ConsumeMessageOrderlyService started");
}

To ensure this method is properly implemented across the codebase, run the following command:

This will show all implementations of the start method with the new signature, allowing us to verify that they are properly implemented.

Scripts executed

The following scripts were executed for the analysis:

Script:

rg --type rust "fn start\(&mut self, this: ArcRefCellWrapper<Self>\)" -A 10

Length of output: 5311


Script:

#!/bin/bash
# Search for the `start` method with the specific signature and check for `todo!()`
rg --type rust 'fn start\(&mut self, this: ArcRefCellWrapper<Self>\)' -A 5

Length of output: 2646


Expand Down Expand Up @@ -63,6 +63,7 @@

async fn submit_consume_request(
&self,
this: ArcRefCellWrapper<Self>,
msgs: Vec<ArcRefCellWrapper<MessageClientExt>>,
process_queue: Arc<ProcessQueue>,
message_queue: MessageQueue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
}

impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
fn start(&mut self) {
// nothing to do
fn start(&mut self, this: ArcRefCellWrapper<Self>) {

Check warning on line 60 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs#L60

Added line #L60 was not covered by tests
//todo!()
Comment on lines +60 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Complete the start method implementation and ensure it adheres to existing concurrency patterns.

The start method in ConsumeMessagePopConcurrentlyService now includes this: ArcRefCellWrapper<Self>, aligning with the common usage of ArcRefCellWrapper across the codebase for managing shared, mutable state. However, the method currently contains a todo!(), which prevents the service from starting correctly.

Please implement the start method with the necessary logic to initialize and run the service. Additionally, ensure that the usage of ArcRefCellWrapper<Self> is consistent with how it's applied in other parts of the codebase to maintain concurrency safety and shared access patterns.

Analysis chain

Implement the start method and clarify the use of ArcRefCellWrapper.

The start method signature has been updated to include this: ArcRefCellWrapper<Self>, which suggests a change in how the service instance is managed. However, there are two concerns:

  1. The method body uses todo!(), indicating that the implementation is incomplete. This is critical for a start method in a service.
  2. The purpose and usage of ArcRefCellWrapper<Self> are not clear from this context.

Please implement the start method with the necessary logic to initialize and start the service. Also, consider adding a comment explaining the rationale behind using ArcRefCellWrapper<Self> and how it contributes to the service's functionality or concurrency model.

To ensure consistency across the codebase, let's check for other usages of ArcRefCellWrapper:

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other usages of ArcRefCellWrapper in method signatures
rg --type rust "fn.*ArcRefCellWrapper<.*>" -g '!target/'

Length of output: 686


Script:

#!/bin/bash
# Search for all usages of ArcRefCellWrapper in the codebase
rg "ArcRefCellWrapper" -g '!target/'

Length of output: 30256

}

fn shutdown(&mut self, await_terminate_millis: u64) {
Expand Down Expand Up @@ -91,6 +91,7 @@

async fn submit_consume_request(
&self,
this: ArcRefCellWrapper<Self>,
msgs: Vec<ArcRefCellWrapper<MessageClientExt>>,
process_queue: Arc<ProcessQueue>,
message_queue: MessageQueue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
pub struct ConsumeMessagePopOrderlyService;

impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService {
fn start(&mut self) {
fn start(&mut self, this: ArcRefCellWrapper<Self>) {

Check warning on line 32 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs#L32

Added line #L32 was not covered by tests
todo!()
}

Expand Down Expand Up @@ -63,6 +63,7 @@

async fn submit_consume_request(
&self,
this: ArcRefCellWrapper<Self>,
msgs: Vec<ArcRefCellWrapper<MessageClientExt>>,
process_queue: Arc<ProcessQueue>,
message_queue: MessageQueue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ where
T: ConsumeMessageServiceTrait,
K: ConsumeMessageServiceTrait,
{
pub consume_message_concurrently_service: T,
pub consume_message_pop_concurrently_service: K,
pub consume_message_concurrently_service: ArcRefCellWrapper<T>,
pub consume_message_pop_concurrently_service: ArcRefCellWrapper<K>,
}
pub struct ConsumeMessageOrderlyServiceGeneral<T, K>
where
T: ConsumeMessageServiceTrait,
K: ConsumeMessageServiceTrait,
{
pub consume_message_orderly_service: T,
pub consume_message_pop_orderly_service: K,
pub consume_message_orderly_service: ArcRefCellWrapper<T>,
pub consume_message_pop_orderly_service: ArcRefCellWrapper<K>,
}

pub trait ConsumeMessageServiceTrait {
fn start(&mut self);
fn start(&mut self, this: ArcRefCellWrapper<Self>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reassess passing this as ArcRefCellWrapper<Self> in start method

The start method signature now includes this: ArcRefCellWrapper<Self>. Passing self wrapped in an ArcRefCellWrapper can lead to self-referential structures, which may cause issues with Rust's ownership model and potential runtime problems. Consider whether this pattern is necessary or if there's an alternative approach that avoids potential pitfalls.

If possible, refactor the code to avoid passing this. Instead, consider designing the method to work with &self or refactoring the logic to eliminate the need for self-referential structures.


fn shutdown(&mut self, await_terminate_millis: u64);

Expand All @@ -63,6 +63,7 @@ pub trait ConsumeMessageServiceTrait {

async fn submit_consume_request(
&self,
this: ArcRefCellWrapper<Self>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evaluate the inclusion of this in submit_consume_request method

The method submit_consume_request now includes this: ArcRefCellWrapper<Self> as a parameter. Similar to the start method, passing this might introduce complexities with self-referential structures and ownership issues. Assess whether this is necessary or if the method can be refactored to eliminate the need to pass this.

Consider refactoring the method to use &self or redesigning it to avoid passing this. This could prevent potential issues with ownership and lifetime management in Rust.

msgs: Vec<ArcRefCellWrapper<MessageClientExt>>,
process_queue: Arc<ProcessQueue>,
message_queue: MessageQueue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,31 +272,36 @@ impl DefaultMQPushConsumerImpl {
self.consume_orderly = false;
self.consume_message_concurrently_service = Some(ArcRefCellWrapper::new(
ConsumeMessageConcurrentlyServiceGeneral {
consume_message_concurrently_service:
consume_message_concurrently_service: ArcRefCellWrapper::new(
ConsumeMessageConcurrentlyService::new(
self.client_config.clone(),
self.consumer_config.clone(),
self.consumer_config.consumer_group.clone(),
listener.clone().expect("listener is None"),
self.default_mqpush_consumer_impl.clone(),
),
),
Comment on lines +275 to +283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential panic due to calling clone() before expect() on Option

In line 275, calling clone() on listener (which is an Option), before using expect("listener is None"), may cause a panic if listener is None. It's safer to call expect("listener is None") before cloning to ensure that listener is not None.

Apply this diff to fix the issue:

- listener.clone().expect("listener is None")
+ listener.expect("listener is None").clone()
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
consume_message_concurrently_service: ArcRefCellWrapper::new(
ConsumeMessageConcurrentlyService::new(
self.client_config.clone(),
self.consumer_config.clone(),
self.consumer_config.consumer_group.clone(),
listener.clone().expect("listener is None"),
self.default_mqpush_consumer_impl.clone(),
),
),
consume_message_concurrently_service: ArcRefCellWrapper::new(
ConsumeMessageConcurrentlyService::new(
self.client_config.clone(),
self.consumer_config.clone(),
self.consumer_config.consumer_group.clone(),
listener.expect("listener is None").clone(),
self.default_mqpush_consumer_impl.clone(),
),
),


consume_message_pop_concurrently_service:
consume_message_pop_concurrently_service: ArcRefCellWrapper::new(
ConsumeMessagePopConcurrentlyService::new(
self.client_config.clone(),
self.consumer_config.clone(),
self.consumer_config.consumer_group.clone(),
listener.expect("listener is None"),
),
),
},
));
} else if message_listener.message_listener_orderly.is_some() {
self.consume_orderly = true;
self.consume_message_orderly_service = Some(ArcRefCellWrapper::new(
ConsumeMessageOrderlyServiceGeneral {
consume_message_orderly_service: ConsumeMessageOrderlyService,
consume_message_pop_orderly_service:
consume_message_orderly_service: ArcRefCellWrapper::new(
ConsumeMessageOrderlyService,
),
consume_message_pop_orderly_service: ArcRefCellWrapper::new(
ConsumeMessagePopOrderlyService,
),
Comment on lines +299 to +304
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing initialization of ConsumeMessageOrderlyService

In line 300, ConsumeMessageOrderlyService is used without calling a constructor or providing initialization parameters. If this service requires initialization, you should instantiate it properly using a constructor like ::new().

Apply this diff to fix the issue:

- ConsumeMessageOrderlyService,
+ ConsumeMessageOrderlyService::new(),

Similarly, update the initialization of ConsumeMessagePopOrderlyService accordingly:

- ConsumeMessagePopOrderlyService,
+ ConsumeMessagePopOrderlyService::new(),

Committable suggestion was skipped due to low confidence.

},
));
}
Expand All @@ -305,23 +310,37 @@ impl DefaultMQPushConsumerImpl {
if let Some(consume_message_concurrently_service) =
self.consume_message_concurrently_service.as_mut()
{
let this = consume_message_concurrently_service
.consume_message_concurrently_service
.clone();
consume_message_concurrently_service
.consume_message_concurrently_service
.start();
.start(this);

Comment on lines +313 to +319
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary cloning and incorrect use of start() method

In lines 313-319, cloning the service and passing it to its own start() method is unnecessary and may indicate misuse. If the start() method does not require an argument, you should call it directly without cloning or passing self.

Apply this diff to fix the issue:

- let this = consume_message_concurrently_service
-     .consume_message_concurrently_service
-     .clone();
- consume_message_concurrently_service
-     .consume_message_concurrently_service
-     .start(this);
+ consume_message_concurrently_service
+     .consume_message_concurrently_service
+     .start();
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let this = consume_message_concurrently_service
.consume_message_concurrently_service
.clone();
consume_message_concurrently_service
.consume_message_concurrently_service
.start();
.start(this);
consume_message_concurrently_service
.consume_message_concurrently_service
.start();

let wrapper = consume_message_concurrently_service
.consume_message_pop_concurrently_service
.clone();
consume_message_concurrently_service
.consume_message_pop_concurrently_service
.start();
.start(wrapper);
Comment on lines +320 to +325
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary cloning and incorrect use of start() method

In lines 320-325, cloning consume_message_pop_concurrently_service and passing it to its start() method may not be required. Verify whether start() needs an argument. If not, simplify the code by calling start() directly.

Apply this diff to fix the issue:

- let wrapper = consume_message_concurrently_service
-     .consume_message_pop_concurrently_service
-     .clone();
- consume_message_concurrently_service
-     .consume_message_pop_concurrently_service
-     .start(wrapper);
+ consume_message_concurrently_service
+     .consume_message_pop_concurrently_service
+     .start();
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let wrapper = consume_message_concurrently_service
.consume_message_pop_concurrently_service
.clone();
consume_message_concurrently_service
.consume_message_pop_concurrently_service
.start();
.start(wrapper);
consume_message_concurrently_service
.consume_message_pop_concurrently_service
.start();

}

if let Some(consume_message_orderly_service) =
self.consume_message_orderly_service.as_mut()
{
let wrapper = consume_message_orderly_service
.consume_message_orderly_service
.clone();
consume_message_orderly_service
.consume_message_orderly_service
.start();
.start(wrapper);

let wrapper = consume_message_orderly_service
.consume_message_pop_orderly_service
.clone();
consume_message_orderly_service
.consume_message_pop_orderly_service
.start();
.start(wrapper);
Comment on lines +331 to +343
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary cloning and incorrect use of start() method

In lines 331-343, cloning service instances and passing them to their own start() methods may be unnecessary. If the start() methods do not require an argument, you can simplify the code by calling start() directly without cloning.

Apply this diff to fix the issue:

- let wrapper = consume_message_orderly_service
-     .consume_message_orderly_service
-     .clone();
- consume_message_orderly_service
-     .consume_message_orderly_service
-     .start(wrapper);
+ consume_message_orderly_service
+     .consume_message_orderly_service
+     .start();

- let wrapper = consume_message_orderly_service
-     .consume_message_pop_orderly_service
-     .clone();
- consume_message_orderly_service
-     .consume_message_pop_orderly_service
-     .start(wrapper);
+ consume_message_orderly_service
+     .consume_message_pop_orderly_service
+     .start();
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let wrapper = consume_message_orderly_service
.consume_message_orderly_service
.clone();
consume_message_orderly_service
.consume_message_orderly_service
.start();
.start(wrapper);
let wrapper = consume_message_orderly_service
.consume_message_pop_orderly_service
.clone();
consume_message_orderly_service
.consume_message_pop_orderly_service
.start();
.start(wrapper);
consume_message_orderly_service
.consume_message_orderly_service
.start();
consume_message_orderly_service
.consume_message_pop_orderly_service
.start();

}
let consumer_impl = self.clone();
self.client_instance
Expand Down
Loading
Loading