Skip to content

[ISSUE #1599]🚀Rocketmq-client support Pop consumer-2📝 #1609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,22 @@
.execute_pull_request_later(pull_request, time_delay);
}

pub async fn execute_pop_request_immediately(&mut self, pop_request: PopRequest) {
self.client_instance
.as_mut()
.unwrap()
.pull_message_service
.execute_pop_pull_request_immediately(pop_request)
.await;
}
Comment on lines +703 to +710
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add Error Handling in execute_pop_request_immediately

The method execute_pop_request_immediately forwards the pop_request to the pull_message_service without handling potential errors. Ensure that any errors returned by execute_pop_pull_request_immediately are appropriately caught and managed, possibly with logging or retry logic.

pub fn execute_pop_request_later(&mut self, pop_request: PopRequest, time_delay: u64) {
self.client_instance
.as_mut()
.unwrap()
.pull_message_service
.execute_pop_pull_request_later(pop_request, time_delay);
}

Check warning on line 717 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L703-L717

Added lines #L703 - L717 were not covered by tests

pub(crate) async fn pop_message(&mut self, pop_request: PopRequest) {
unimplemented!("popMessage");
}
Expand Down
123 changes: 121 additions & 2 deletions rocketmq-client/src/consumer/consumer_impl/re_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,46 @@ pub(crate) mod rebalance_service;

#[trait_variant::make(Rebalance: Send)]
pub trait RebalanceLocal {
/// Handles changes in the message queue.
///
/// # Arguments
///
/// * `topic` - The topic for which the message queue has changed.
/// * `mq_all` - A set of all message queues.
/// * `mq_divided` - A set of divided message queues.
async fn message_queue_changed(
&mut self,
topic: &str,
mq_all: &HashSet<MessageQueue>,
mq_divided: &HashSet<MessageQueue>,
);

/// Removes an unnecessary message queue.
///
/// # Arguments
///
/// * `mq` - The message queue to be removed.
/// * `pq` - The process queue associated with the message queue.
///
/// # Returns
///
/// A boolean indicating whether the message queue was removed.
async fn remove_unnecessary_message_queue(
&mut self,
mq: &MessageQueue,
pq: &ProcessQueue,
) -> bool;

/// Removes an unnecessary pop message queue.
///
/// # Arguments
///
/// * `_mq` - The message queue to be removed.
/// * `_pq` - The pop process queue associated with the message queue.
///
/// # Returns
///
/// A boolean indicating whether the pop message queue was removed.
fn remove_unnecessary_pop_message_queue(
&mut self,
_mq: &MessageQueue,
Expand All @@ -51,35 +79,126 @@ pub trait RebalanceLocal {
true
}

/// Retrieves the consume type.
///
/// # Returns
///
/// The consume type.
Comment on lines +82 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure All Trait Implementations Provide consume_type Method

The consume_type method has been added to the RebalanceLocal trait. Verify that all structs implementing this trait now provide an appropriate implementation of consume_type to prevent compile-time errors.

fn consume_type(&self) -> ConsumeType;

/// Removes a dirty offset.
///
/// # Arguments
///
/// * `mq` - The message queue for which the offset should be removed.
async fn remove_dirty_offset(&mut self, mq: &MessageQueue);

/// Computes the pull offset with exception handling.
///
/// # Arguments
///
/// * `mq` - The message queue for which the pull offset should be computed.
///
/// # Returns
///
/// A result containing the pull offset or an error.
async fn compute_pull_from_where_with_exception(&mut self, mq: &MessageQueue) -> Result<i64>;

/// Computes the pull offset.
///
/// # Arguments
///
/// * `mq` - The message queue for which the pull offset should be computed.
///
/// # Returns
///
/// The pull offset.
async fn compute_pull_from_where(&mut self, mq: &MessageQueue) -> i64;

/// Retrieves the consume initialization mode.
///
/// # Returns
///
/// The consume initialization mode.
fn get_consume_init_mode(&self) -> i32;

/// Dispatches pull requests.
///
/// # Arguments
///
/// * `pull_request_list` - A list of pull requests to be dispatched.
/// * `delay` - The delay in milliseconds before dispatching the pull requests.
async fn dispatch_pull_request(&self, pull_request_list: Vec<PullRequest>, delay: u64);

async fn dispatch_pop_pull_request(&self, pull_request_list: Vec<PopRequest>, delay: u64);

/// Dispatches pop pull requests.
///
/// # Arguments
///
/// * `pop_request_list` - A list of pop pull requests to be dispatched.
/// * `delay` - The delay in milliseconds before dispatching the pop pull requests.
async fn dispatch_pop_pull_request(&self, pop_request_list: Vec<PopRequest>, delay: u64);

/// Creates a process queue.
///
/// # Returns
///
/// A new process queue.
fn create_process_queue(&self) -> ProcessQueue;

/// Creates a pop process queue.
///
/// # Returns
///
/// A new pop process queue.
fn create_pop_process_queue(&self) -> PopProcessQueue;

/// Removes a process queue.
///
/// # Arguments
///
/// * `mq` - The message queue for which the process queue should be removed.
async fn remove_process_queue(&mut self, mq: &MessageQueue);

/// Unlocks a message queue.
///
/// # Arguments
///
/// * `mq` - The message queue to be unlocked.
/// * `oneway` - A boolean indicating if the unlock should be one-way.
async fn unlock(&mut self, mq: &MessageQueue, oneway: bool);

/// Locks all message queues.
fn lock_all(&self);

/// Unlocks all message queues.
///
/// # Arguments
///
/// * `oneway` - A boolean indicating if the unlock should be one-way.
fn unlock_all(&self, oneway: bool);

/// Performs rebalancing.
///
/// # Arguments
///
/// * `is_order` - A boolean indicating if the rebalancing is ordered.
///
/// # Returns
///
/// A boolean indicating if the rebalancing was successful.
async fn do_rebalance(&mut self, is_order: bool) -> bool;

/// Performs client-side rebalancing.
///
/// # Arguments
///
/// * `topic` - The topic for which the rebalancing should be performed.
///
/// # Returns
///
/// A boolean indicating if the client-side rebalancing was successful.
fn client_rebalance(&mut self, topic: &str) -> bool;

/// Destroys the rebalancer.
fn destroy(&mut self);
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,15 +410,33 @@
}
}

async fn dispatch_pop_pull_request(&self, pull_request_list: Vec<PopRequest>, delay: u64) {
todo!()
async fn dispatch_pop_pull_request(&self, pop_request_list: Vec<PopRequest>, delay: u64) {
let mqpush_consumer_impl = self
.default_mqpush_consumer_impl
.as_ref()
.unwrap()
.upgrade();
if mqpush_consumer_impl.is_none() {
return;
}
let mut mqpush_consumer_impl = mqpush_consumer_impl.unwrap();
for pop_request in pop_request_list {
if delay == 0 {
mqpush_consumer_impl
.execute_pop_request_immediately(pop_request)
.await;
} else {
mqpush_consumer_impl.execute_pop_request_later(pop_request, delay);
}

Check warning on line 430 in rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs#L413-L430

Added lines #L413 - L430 were not covered by tests
}
}

#[inline]
fn create_process_queue(&self) -> ProcessQueue {
ProcessQueue::new()
}

#[inline]
fn create_pop_process_queue(&self) -> PopProcessQueue {
PopProcessQueue::new()
}
Expand Down
Loading