diff --git a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs index b766cd186..1634bbb97 100644 --- a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs @@ -700,6 +700,22 @@ impl DefaultMQPushConsumerImpl { .execute_pull_request_later(pull_request, time_delay); } + pub async fn execute_pop_request_immediately(&mut self, pop_request: PopRequest) { + self.client_instance + .as_mut() + .unwrap() + .pull_message_service + .execute_pop_pull_request_immediately(pop_request) + .await; + } + pub fn execute_pop_request_later(&mut self, pop_request: PopRequest, time_delay: u64) { + self.client_instance + .as_mut() + .unwrap() + .pull_message_service + .execute_pop_pull_request_later(pop_request, time_delay); + } + pub(crate) async fn pop_message(&mut self, pop_request: PopRequest) { unimplemented!("popMessage"); } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance.rs index 79ba9e6a9..3c5da5af3 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance.rs @@ -31,18 +31,46 @@ pub(crate) mod rebalance_service; #[trait_variant::make(Rebalance: Send)] pub trait RebalanceLocal { + /// Handles changes in the message queue. + /// + /// # Arguments + /// + /// * `topic` - The topic for which the message queue has changed. + /// * `mq_all` - A set of all message queues. + /// * `mq_divided` - A set of divided message queues. async fn message_queue_changed( &mut self, topic: &str, mq_all: &HashSet, mq_divided: &HashSet, ); + + /// Removes an unnecessary message queue. + /// + /// # Arguments + /// + /// * `mq` - The message queue to be removed. + /// * `pq` - The process queue associated with the message queue. + /// + /// # Returns + /// + /// A boolean indicating whether the message queue was removed. async fn remove_unnecessary_message_queue( &mut self, mq: &MessageQueue, pq: &ProcessQueue, ) -> bool; + /// Removes an unnecessary pop message queue. + /// + /// # Arguments + /// + /// * `_mq` - The message queue to be removed. + /// * `_pq` - The pop process queue associated with the message queue. + /// + /// # Returns + /// + /// A boolean indicating whether the pop message queue was removed. fn remove_unnecessary_pop_message_queue( &mut self, _mq: &MessageQueue, @@ -51,35 +79,126 @@ pub trait RebalanceLocal { true } + /// Retrieves the consume type. + /// + /// # Returns + /// + /// The consume type. fn consume_type(&self) -> ConsumeType; + /// Removes a dirty offset. + /// + /// # Arguments + /// + /// * `mq` - The message queue for which the offset should be removed. async fn remove_dirty_offset(&mut self, mq: &MessageQueue); + /// Computes the pull offset with exception handling. + /// + /// # Arguments + /// + /// * `mq` - The message queue for which the pull offset should be computed. + /// + /// # Returns + /// + /// A result containing the pull offset or an error. async fn compute_pull_from_where_with_exception(&mut self, mq: &MessageQueue) -> Result; + /// Computes the pull offset. + /// + /// # Arguments + /// + /// * `mq` - The message queue for which the pull offset should be computed. + /// + /// # Returns + /// + /// The pull offset. async fn compute_pull_from_where(&mut self, mq: &MessageQueue) -> i64; + /// Retrieves the consume initialization mode. + /// + /// # Returns + /// + /// The consume initialization mode. fn get_consume_init_mode(&self) -> i32; + /// Dispatches pull requests. + /// + /// # Arguments + /// + /// * `pull_request_list` - A list of pull requests to be dispatched. + /// * `delay` - The delay in milliseconds before dispatching the pull requests. async fn dispatch_pull_request(&self, pull_request_list: Vec, delay: u64); - async fn dispatch_pop_pull_request(&self, pull_request_list: Vec, delay: u64); - + /// Dispatches pop pull requests. + /// + /// # Arguments + /// + /// * `pop_request_list` - A list of pop pull requests to be dispatched. + /// * `delay` - The delay in milliseconds before dispatching the pop pull requests. + async fn dispatch_pop_pull_request(&self, pop_request_list: Vec, delay: u64); + + /// Creates a process queue. + /// + /// # Returns + /// + /// A new process queue. fn create_process_queue(&self) -> ProcessQueue; + /// Creates a pop process queue. + /// + /// # Returns + /// + /// A new pop process queue. fn create_pop_process_queue(&self) -> PopProcessQueue; + /// Removes a process queue. + /// + /// # Arguments + /// + /// * `mq` - The message queue for which the process queue should be removed. async fn remove_process_queue(&mut self, mq: &MessageQueue); + /// Unlocks a message queue. + /// + /// # Arguments + /// + /// * `mq` - The message queue to be unlocked. + /// * `oneway` - A boolean indicating if the unlock should be one-way. async fn unlock(&mut self, mq: &MessageQueue, oneway: bool); + /// Locks all message queues. fn lock_all(&self); + /// Unlocks all message queues. + /// + /// # Arguments + /// + /// * `oneway` - A boolean indicating if the unlock should be one-way. fn unlock_all(&self, oneway: bool); + /// Performs rebalancing. + /// + /// # Arguments + /// + /// * `is_order` - A boolean indicating if the rebalancing is ordered. + /// + /// # Returns + /// + /// A boolean indicating if the rebalancing was successful. async fn do_rebalance(&mut self, is_order: bool) -> bool; + /// Performs client-side rebalancing. + /// + /// # Arguments + /// + /// * `topic` - The topic for which the rebalancing should be performed. + /// + /// # Returns + /// + /// A boolean indicating if the client-side rebalancing was successful. fn client_rebalance(&mut self, topic: &str) -> bool; + /// Destroys the rebalancer. fn destroy(&mut self); } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs index b356c0d3f..b0fd9ef49 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs @@ -410,8 +410,25 @@ impl Rebalance for RebalancePushImpl { } } - async fn dispatch_pop_pull_request(&self, pull_request_list: Vec, delay: u64) { - todo!() + async fn dispatch_pop_pull_request(&self, pop_request_list: Vec, delay: u64) { + let mqpush_consumer_impl = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade(); + if mqpush_consumer_impl.is_none() { + return; + } + let mut mqpush_consumer_impl = mqpush_consumer_impl.unwrap(); + for pop_request in pop_request_list { + if delay == 0 { + mqpush_consumer_impl + .execute_pop_request_immediately(pop_request) + .await; + } else { + mqpush_consumer_impl.execute_pop_request_later(pop_request, delay); + } + } } #[inline] @@ -419,6 +436,7 @@ impl Rebalance for RebalancePushImpl { ProcessQueue::new() } + #[inline] fn create_pop_process_queue(&self) -> PopProcessQueue { PopProcessQueue::new() }