-
Notifications
You must be signed in to change notification settings - Fork 161
[ISSUE #986]🚀Add Runtime for ConsumeMessageConcurrentlyService⚡️ #987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ use rocketmq_client::consumer::mq_push_consumer::MQPushConsumer; | |
use rocketmq_client::Result; | ||
use rocketmq_common::common::message::message_ext::MessageExt; | ||
use rocketmq_rust::rocketmq; | ||
use tracing::info; | ||
|
||
pub const MESSAGE_COUNT: usize = 1; | ||
pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_4"; | ||
|
@@ -32,7 +33,7 @@ pub const TAG: &str = "*"; | |
#[rocketmq::main] | ||
pub async fn main() -> Result<()> { | ||
//init logger | ||
//rocketmq_common::log::init_logger(); | ||
rocketmq_common::log::init_logger(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tip Codebase Verification Potential Panic in Logger Initialization The
Analysis chainLGTM: Logger initialization activated Activating the logger initialization is a good practice for proper logging in the application. This change aligns well with the overall improvement in the logging mechanism. To ensure the Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Verify the implementation of init_logger() function
# Test: Search for the init_logger function definition
rg --type rust -A 5 'fn init_logger'
Length of output: 424 Script: #!/bin/bash
# Description: Retrieve the full implementation of the init_logger() function
rg --type rust 'fn init_logger' -A 10 rocketmq-common/src/log.rs
Length of output: 417 Script: #!/bin/bash
# Description: Find all usages of the init_logger() function to verify correct invocation
rg --type rust 'init_logger\(\)'
Length of output: 927 |
||
|
||
// create a producer builder with default configuration | ||
let builder = DefaultMQPushConsumer::builder(); | ||
|
@@ -57,7 +58,7 @@ impl MessageListenerConcurrently for MyMessageListener { | |
_context: &ConsumeConcurrentlyContext, | ||
) -> Result<ConsumeConcurrentlyStatus> { | ||
for msg in msgs { | ||
println!("Receive message: {:?}", msg); | ||
info!("Receive message: {:?}", msg); | ||
} | ||
Ok(ConsumeConcurrentlyStatus::ConsumeSuccess) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
use rocketmq_common::WeakCellWrapper; | ||
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; | ||
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; | ||
use rocketmq_runtime::RocketMQRuntime; | ||
use tracing::info; | ||
use tracing::warn; | ||
|
||
|
@@ -45,14 +46,13 @@ | |
use crate::consumer::listener::message_listener_concurrently::ArcBoxMessageListenerConcurrently; | ||
use crate::hook::consume_message_context::ConsumeMessageContext; | ||
|
||
#[derive(Clone)] | ||
pub struct ConsumeMessageConcurrentlyService { | ||
pub(crate) default_mqpush_consumer_impl: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>, | ||
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>, | ||
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>, | ||
pub(crate) consumer_group: Arc<String>, | ||
pub(crate) message_listener: ArcBoxMessageListenerConcurrently, | ||
// pub(crate) consume_runtime: Arc<RocketMQRuntime>, | ||
pub(crate) consume_runtime: RocketMQRuntime, | ||
} | ||
|
||
impl ConsumeMessageConcurrentlyService { | ||
|
@@ -64,27 +64,46 @@ | |
default_mqpush_consumer_impl: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>, | ||
) -> Self { | ||
let consume_thread = consumer_config.consume_thread_max; | ||
let consumer_group_tag = format!("{}_{}", "ConsumeMessageThread_", consumer_group); | ||
Self { | ||
default_mqpush_consumer_impl, | ||
client_config, | ||
consumer_config, | ||
consumer_group: Arc::new(consumer_group), | ||
message_listener, | ||
/*consume_runtime: Arc::new(RocketMQRuntime::new_multi( | ||
consume_runtime: RocketMQRuntime::new_multi( | ||
consume_thread as usize, | ||
"ConsumeMessageThread_", | ||
)),*/ | ||
consumer_group_tag.as_str(), | ||
), | ||
} | ||
} | ||
} | ||
|
||
impl ConsumeMessageConcurrentlyService { | ||
async fn clean_expire_msg(&mut self) { | ||
println!("===========================") | ||
if let Some(default_mqpush_consumer_impl) = self | ||
.default_mqpush_consumer_impl | ||
.as_ref() | ||
.unwrap() | ||
.upgrade() | ||
{ | ||
let process_queue_table = default_mqpush_consumer_impl | ||
.rebalance_impl | ||
.rebalance_impl_inner | ||
.process_queue_table | ||
.read() | ||
.await; | ||
for (_, process_queue) in process_queue_table.iter() { | ||
process_queue | ||
.clean_expired_msg(self.default_mqpush_consumer_impl.clone()) | ||
.await; | ||
Check warning on line 99 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
} | ||
} | ||
Check warning on line 101 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
Comment on lines
+84
to
+101
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle potential In the Consider handling the if let Some(default_mqpush_consumer_impl) = self
.default_mqpush_consumer_impl
.as_ref()
.unwrap()
.upgrade()
{
// existing code
} else {
warn!("Failed to upgrade default_mqpush_consumer_impl; it may have been dropped");
}
|
||
} | ||
|
||
async fn process_consume_result( | ||
&mut self, | ||
this: ArcRefCellWrapper<Self>, | ||
Check warning on line 106 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
status: ConsumeConcurrentlyStatus, | ||
context: &ConsumeConcurrentlyContext, | ||
consume_request: &mut ConsumeRequest, | ||
|
@@ -141,6 +160,7 @@ | |
consume_request.msgs.append(&mut msg_back_success); | ||
self.submit_consume_request_later( | ||
msg_back_failed, | ||
this, | ||
Check warning on line 163 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
consume_request.process_queue.clone(), | ||
consume_request.message_queue.clone(), | ||
); | ||
|
@@ -171,13 +191,14 @@ | |
fn submit_consume_request_later( | ||
&self, | ||
msgs: Vec<ArcRefCellWrapper<MessageClientExt>>, | ||
this: ArcRefCellWrapper<Self>, | ||
process_queue: Arc<ProcessQueue>, | ||
message_queue: MessageQueue, | ||
) { | ||
let this = self.clone(); | ||
tokio::spawn(async move { | ||
self.consume_runtime.get_handle().spawn(async move { | ||
Check warning on line 198 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
tokio::time::sleep(Duration::from_secs(5)).await; | ||
this.submit_consume_request(msgs, process_queue, message_queue, true) | ||
let this_ = this.clone(); | ||
this.submit_consume_request(this_, msgs, process_queue, message_queue, true) | ||
Check warning on line 201 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
.await; | ||
}); | ||
} | ||
|
@@ -211,9 +232,8 @@ | |
} | ||
|
||
impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { | ||
fn start(&mut self) { | ||
let mut this = self.clone(); | ||
tokio::spawn(async move { | ||
fn start(&mut self, mut this: ArcRefCellWrapper<Self>) { | ||
self.consume_runtime.get_handle().spawn(async move { | ||
Check warning on line 236 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
let timeout = this.consumer_config.consume_timeout; | ||
let mut interval = tokio::time::interval(Duration::from_secs(timeout * 60)); | ||
interval.tick().await; | ||
|
@@ -254,6 +274,7 @@ | |
|
||
async fn submit_consume_request( | ||
&self, | ||
this: ArcRefCellWrapper<Self>, | ||
Check warning on line 277 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
mut msgs: Vec<ArcRefCellWrapper<MessageClientExt>>, | ||
process_queue: Arc<ProcessQueue>, | ||
message_queue: MessageQueue, | ||
|
@@ -270,13 +291,10 @@ | |
consumer_group: self.consumer_group.as_ref().clone(), | ||
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(), | ||
}; | ||
let consume_message_concurrently_service = self.clone(); | ||
|
||
tokio::spawn(async move { | ||
consume_request | ||
.run(consume_message_concurrently_service) | ||
.await | ||
}); | ||
self.consume_runtime | ||
Check warning on line 295 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
.get_handle() | ||
.spawn(async move { consume_request.run(this).await }); | ||
Check warning on line 297 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
} else { | ||
loop { | ||
let item = if msgs.len() > consume_batch_size as usize { | ||
|
@@ -294,13 +312,8 @@ | |
consumer_group: self.consumer_group.as_ref().clone(), | ||
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(), | ||
}; | ||
let consume_message_concurrently_service = self.clone(); | ||
/* self.consume_runtime.get_handle().spawn(async move { | ||
consume_request | ||
.run(consume_message_concurrently_service) | ||
.await | ||
});*/ | ||
tokio::spawn(async move { | ||
let consume_message_concurrently_service = this.clone(); | ||
self.consume_runtime.get_handle().spawn(async move { | ||
Check warning on line 316 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
consume_request | ||
.run(consume_message_concurrently_service) | ||
.await | ||
|
@@ -335,7 +348,9 @@ | |
impl ConsumeRequest { | ||
async fn run( | ||
&mut self, | ||
mut consume_message_concurrently_service: ConsumeMessageConcurrentlyService, | ||
mut consume_message_concurrently_service: ArcRefCellWrapper< | ||
Check warning on line 351 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
ConsumeMessageConcurrentlyService, | ||
>, | ||
) { | ||
if self.process_queue.is_dropped() { | ||
info!( | ||
|
@@ -456,8 +471,9 @@ | |
self.consumer_group, self.message_queue, | ||
); | ||
} else { | ||
let this = consume_message_concurrently_service.clone(); | ||
Check warning on line 474 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
consume_message_concurrently_service | ||
.process_consume_result(status.unwrap(), &context, self) | ||
.process_consume_result(this, status.unwrap(), &context, self) | ||
Check warning on line 476 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs
|
||
.await; | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,7 @@ | |
pub struct ConsumeMessageOrderlyService; | ||
|
||
impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService { | ||
fn start(&mut self) { | ||
fn start(&mut self, this: ArcRefCellWrapper<Self>) { | ||
todo!() | ||
} | ||
Comment on lines
+32
to
34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tip Codebase Verification Implement the The
Ensure each implementation initializes necessary resources, sets up background tasks, and includes appropriate startup logic. Analysis chainImplement the The Consider the following steps for implementation:
Example skeleton: fn start(&mut self, this: ArcRefCellWrapper<Self>) {
// Initialize resources
// Set up background tasks
// Implement startup logic
log::info!("ConsumeMessageOrderlyService started");
} To ensure this method is properly implemented across the codebase, run the following command: This will show all implementations of the Scripts executedThe following scripts were executed for the analysis: Script: rg --type rust "fn start\(&mut self, this: ArcRefCellWrapper<Self>\)" -A 10
Length of output: 5311 Script: #!/bin/bash
# Search for the `start` method with the specific signature and check for `todo!()`
rg --type rust 'fn start\(&mut self, this: ArcRefCellWrapper<Self>\)' -A 5
Length of output: 2646 |
||
|
||
|
@@ -63,6 +63,7 @@ | |
|
||
async fn submit_consume_request( | ||
&self, | ||
this: ArcRefCellWrapper<Self>, | ||
msgs: Vec<ArcRefCellWrapper<MessageClientExt>>, | ||
process_queue: Arc<ProcessQueue>, | ||
message_queue: MessageQueue, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,8 +57,8 @@ | |
} | ||
|
||
impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService { | ||
fn start(&mut self) { | ||
// nothing to do | ||
fn start(&mut self, this: ArcRefCellWrapper<Self>) { | ||
Check warning on line 60 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs
|
||
//todo!() | ||
Comment on lines
+60
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tip Codebase Verification Complete the The Please implement the Analysis chainImplement the The
Please implement the To ensure consistency across the codebase, let's check for other usages of Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Search for other usages of ArcRefCellWrapper in method signatures
rg --type rust "fn.*ArcRefCellWrapper<.*>" -g '!target/'
Length of output: 686 Script: #!/bin/bash
# Search for all usages of ArcRefCellWrapper in the codebase
rg "ArcRefCellWrapper" -g '!target/'
Length of output: 30256 |
||
} | ||
|
||
fn shutdown(&mut self, await_terminate_millis: u64) { | ||
|
@@ -91,6 +91,7 @@ | |
|
||
async fn submit_consume_request( | ||
&self, | ||
this: ArcRefCellWrapper<Self>, | ||
msgs: Vec<ArcRefCellWrapper<MessageClientExt>>, | ||
process_queue: Arc<ProcessQueue>, | ||
message_queue: MessageQueue, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,20 +30,20 @@ where | |
T: ConsumeMessageServiceTrait, | ||
K: ConsumeMessageServiceTrait, | ||
{ | ||
pub consume_message_concurrently_service: T, | ||
pub consume_message_pop_concurrently_service: K, | ||
pub consume_message_concurrently_service: ArcRefCellWrapper<T>, | ||
pub consume_message_pop_concurrently_service: ArcRefCellWrapper<K>, | ||
} | ||
pub struct ConsumeMessageOrderlyServiceGeneral<T, K> | ||
where | ||
T: ConsumeMessageServiceTrait, | ||
K: ConsumeMessageServiceTrait, | ||
{ | ||
pub consume_message_orderly_service: T, | ||
pub consume_message_pop_orderly_service: K, | ||
pub consume_message_orderly_service: ArcRefCellWrapper<T>, | ||
pub consume_message_pop_orderly_service: ArcRefCellWrapper<K>, | ||
} | ||
|
||
pub trait ConsumeMessageServiceTrait { | ||
fn start(&mut self); | ||
fn start(&mut self, this: ArcRefCellWrapper<Self>); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reassess passing The If possible, refactor the code to avoid passing |
||
|
||
fn shutdown(&mut self, await_terminate_millis: u64); | ||
|
||
|
@@ -63,6 +63,7 @@ pub trait ConsumeMessageServiceTrait { | |
|
||
async fn submit_consume_request( | ||
&self, | ||
this: ArcRefCellWrapper<Self>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Evaluate the inclusion of The method Consider refactoring the method to use |
||
msgs: Vec<ArcRefCellWrapper<MessageClientExt>>, | ||
process_queue: Arc<ProcessQueue>, | ||
message_queue: MessageQueue, | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -272,31 +272,36 @@ impl DefaultMQPushConsumerImpl { | |||||||||||||||||||||||||||||||||||||||||||||
self.consume_orderly = false; | ||||||||||||||||||||||||||||||||||||||||||||||
self.consume_message_concurrently_service = Some(ArcRefCellWrapper::new( | ||||||||||||||||||||||||||||||||||||||||||||||
ConsumeMessageConcurrentlyServiceGeneral { | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_concurrently_service: | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_concurrently_service: ArcRefCellWrapper::new( | ||||||||||||||||||||||||||||||||||||||||||||||
ConsumeMessageConcurrentlyService::new( | ||||||||||||||||||||||||||||||||||||||||||||||
self.client_config.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||
self.consumer_config.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||
self.consumer_config.consumer_group.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||
listener.clone().expect("listener is None"), | ||||||||||||||||||||||||||||||||||||||||||||||
self.default_mqpush_consumer_impl.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+275
to
+283
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential panic due to calling In line 275, calling Apply this diff to fix the issue: - listener.clone().expect("listener is None")
+ listener.expect("listener is None").clone() Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
consume_message_pop_concurrently_service: | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_pop_concurrently_service: ArcRefCellWrapper::new( | ||||||||||||||||||||||||||||||||||||||||||||||
ConsumeMessagePopConcurrentlyService::new( | ||||||||||||||||||||||||||||||||||||||||||||||
self.client_config.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||
self.consumer_config.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||
self.consumer_config.consumer_group.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||
listener.expect("listener is None"), | ||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||
)); | ||||||||||||||||||||||||||||||||||||||||||||||
} else if message_listener.message_listener_orderly.is_some() { | ||||||||||||||||||||||||||||||||||||||||||||||
self.consume_orderly = true; | ||||||||||||||||||||||||||||||||||||||||||||||
self.consume_message_orderly_service = Some(ArcRefCellWrapper::new( | ||||||||||||||||||||||||||||||||||||||||||||||
ConsumeMessageOrderlyServiceGeneral { | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_orderly_service: ConsumeMessageOrderlyService, | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_pop_orderly_service: | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_orderly_service: ArcRefCellWrapper::new( | ||||||||||||||||||||||||||||||||||||||||||||||
ConsumeMessageOrderlyService, | ||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_pop_orderly_service: ArcRefCellWrapper::new( | ||||||||||||||||||||||||||||||||||||||||||||||
ConsumeMessagePopOrderlyService, | ||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+299
to
+304
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing initialization of In line 300, Apply this diff to fix the issue: - ConsumeMessageOrderlyService,
+ ConsumeMessageOrderlyService::new(), Similarly, update the initialization of - ConsumeMessagePopOrderlyService,
+ ConsumeMessagePopOrderlyService::new(),
|
||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||
)); | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -305,23 +310,37 @@ impl DefaultMQPushConsumerImpl { | |||||||||||||||||||||||||||||||||||||||||||||
if let Some(consume_message_concurrently_service) = | ||||||||||||||||||||||||||||||||||||||||||||||
self.consume_message_concurrently_service.as_mut() | ||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||
let this = consume_message_concurrently_service | ||||||||||||||||||||||||||||||||||||||||||||||
.consume_message_concurrently_service | ||||||||||||||||||||||||||||||||||||||||||||||
.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_concurrently_service | ||||||||||||||||||||||||||||||||||||||||||||||
.consume_message_concurrently_service | ||||||||||||||||||||||||||||||||||||||||||||||
.start(); | ||||||||||||||||||||||||||||||||||||||||||||||
.start(this); | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+313
to
+319
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary cloning and incorrect use of In lines 313-319, cloning the service and passing it to its own Apply this diff to fix the issue: - let this = consume_message_concurrently_service
- .consume_message_concurrently_service
- .clone();
- consume_message_concurrently_service
- .consume_message_concurrently_service
- .start(this);
+ consume_message_concurrently_service
+ .consume_message_concurrently_service
+ .start(); Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
let wrapper = consume_message_concurrently_service | ||||||||||||||||||||||||||||||||||||||||||||||
.consume_message_pop_concurrently_service | ||||||||||||||||||||||||||||||||||||||||||||||
.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_concurrently_service | ||||||||||||||||||||||||||||||||||||||||||||||
.consume_message_pop_concurrently_service | ||||||||||||||||||||||||||||||||||||||||||||||
.start(); | ||||||||||||||||||||||||||||||||||||||||||||||
.start(wrapper); | ||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+320
to
+325
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary cloning and incorrect use of In lines 320-325, cloning Apply this diff to fix the issue: - let wrapper = consume_message_concurrently_service
- .consume_message_pop_concurrently_service
- .clone();
- consume_message_concurrently_service
- .consume_message_pop_concurrently_service
- .start(wrapper);
+ consume_message_concurrently_service
+ .consume_message_pop_concurrently_service
+ .start(); Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if let Some(consume_message_orderly_service) = | ||||||||||||||||||||||||||||||||||||||||||||||
self.consume_message_orderly_service.as_mut() | ||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||
let wrapper = consume_message_orderly_service | ||||||||||||||||||||||||||||||||||||||||||||||
.consume_message_orderly_service | ||||||||||||||||||||||||||||||||||||||||||||||
.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_orderly_service | ||||||||||||||||||||||||||||||||||||||||||||||
.consume_message_orderly_service | ||||||||||||||||||||||||||||||||||||||||||||||
.start(); | ||||||||||||||||||||||||||||||||||||||||||||||
.start(wrapper); | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
let wrapper = consume_message_orderly_service | ||||||||||||||||||||||||||||||||||||||||||||||
.consume_message_pop_orderly_service | ||||||||||||||||||||||||||||||||||||||||||||||
.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||
consume_message_orderly_service | ||||||||||||||||||||||||||||||||||||||||||||||
.consume_message_pop_orderly_service | ||||||||||||||||||||||||||||||||||||||||||||||
.start(); | ||||||||||||||||||||||||||||||||||||||||||||||
.start(wrapper); | ||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+331
to
+343
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary cloning and incorrect use of In lines 331-343, cloning service instances and passing them to their own Apply this diff to fix the issue: - let wrapper = consume_message_orderly_service
- .consume_message_orderly_service
- .clone();
- consume_message_orderly_service
- .consume_message_orderly_service
- .start(wrapper);
+ consume_message_orderly_service
+ .consume_message_orderly_service
+ .start();
- let wrapper = consume_message_orderly_service
- .consume_message_pop_orderly_service
- .clone();
- consume_message_orderly_service
- .consume_message_pop_orderly_service
- .start(wrapper);
+ consume_message_orderly_service
+ .consume_message_pop_orderly_service
+ .start(); Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
let consumer_impl = self.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||
self.client_instance | ||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM: Improved logging mechanism
Changing from
println!
toinfo!
macro from thetracing
crate is a significant improvement. This provides structured logging capabilities, which are beneficial for log management and analysis. Thetracing
crate is a modern, flexible logging framework for Rust, suitable for asynchronous contexts.Consider adding more context to the log message. For example:
This would provide more detailed information in the logs, making debugging and monitoring easier.
Also applies to: 61-61