-
Notifications
You must be signed in to change notification settings - Fork 161
[ISSUE #1599]🚀Rocketmq-client support Pop consumer-2📝 #1609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,18 +31,46 @@ pub(crate) mod rebalance_service; | |
|
||
#[trait_variant::make(Rebalance: Send)] | ||
pub trait RebalanceLocal { | ||
/// Handles changes in the message queue. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `topic` - The topic for which the message queue has changed. | ||
/// * `mq_all` - A set of all message queues. | ||
/// * `mq_divided` - A set of divided message queues. | ||
async fn message_queue_changed( | ||
&mut self, | ||
topic: &str, | ||
mq_all: &HashSet<MessageQueue>, | ||
mq_divided: &HashSet<MessageQueue>, | ||
); | ||
|
||
/// Removes an unnecessary message queue. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `mq` - The message queue to be removed. | ||
/// * `pq` - The process queue associated with the message queue. | ||
/// | ||
/// # Returns | ||
/// | ||
/// A boolean indicating whether the message queue was removed. | ||
async fn remove_unnecessary_message_queue( | ||
&mut self, | ||
mq: &MessageQueue, | ||
pq: &ProcessQueue, | ||
) -> bool; | ||
|
||
/// Removes an unnecessary pop message queue. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `_mq` - The message queue to be removed. | ||
/// * `_pq` - The pop process queue associated with the message queue. | ||
/// | ||
/// # Returns | ||
/// | ||
/// A boolean indicating whether the pop message queue was removed. | ||
fn remove_unnecessary_pop_message_queue( | ||
&mut self, | ||
_mq: &MessageQueue, | ||
|
@@ -51,35 +79,126 @@ pub trait RebalanceLocal { | |
true | ||
} | ||
|
||
/// Retrieves the consume type. | ||
/// | ||
/// # Returns | ||
/// | ||
/// The consume type. | ||
Comment on lines
+82
to
+86
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure All Trait Implementations Provide The |
||
fn consume_type(&self) -> ConsumeType; | ||
|
||
/// Removes a dirty offset. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `mq` - The message queue for which the offset should be removed. | ||
async fn remove_dirty_offset(&mut self, mq: &MessageQueue); | ||
|
||
/// Computes the pull offset with exception handling. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `mq` - The message queue for which the pull offset should be computed. | ||
/// | ||
/// # Returns | ||
/// | ||
/// A result containing the pull offset or an error. | ||
async fn compute_pull_from_where_with_exception(&mut self, mq: &MessageQueue) -> Result<i64>; | ||
|
||
/// Computes the pull offset. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `mq` - The message queue for which the pull offset should be computed. | ||
/// | ||
/// # Returns | ||
/// | ||
/// The pull offset. | ||
async fn compute_pull_from_where(&mut self, mq: &MessageQueue) -> i64; | ||
|
||
/// Retrieves the consume initialization mode. | ||
/// | ||
/// # Returns | ||
/// | ||
/// The consume initialization mode. | ||
fn get_consume_init_mode(&self) -> i32; | ||
|
||
/// Dispatches pull requests. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `pull_request_list` - A list of pull requests to be dispatched. | ||
/// * `delay` - The delay in milliseconds before dispatching the pull requests. | ||
async fn dispatch_pull_request(&self, pull_request_list: Vec<PullRequest>, delay: u64); | ||
|
||
async fn dispatch_pop_pull_request(&self, pull_request_list: Vec<PopRequest>, delay: u64); | ||
|
||
/// Dispatches pop pull requests. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `pop_request_list` - A list of pop pull requests to be dispatched. | ||
/// * `delay` - The delay in milliseconds before dispatching the pop pull requests. | ||
async fn dispatch_pop_pull_request(&self, pop_request_list: Vec<PopRequest>, delay: u64); | ||
|
||
/// Creates a process queue. | ||
/// | ||
/// # Returns | ||
/// | ||
/// A new process queue. | ||
fn create_process_queue(&self) -> ProcessQueue; | ||
|
||
/// Creates a pop process queue. | ||
/// | ||
/// # Returns | ||
/// | ||
/// A new pop process queue. | ||
fn create_pop_process_queue(&self) -> PopProcessQueue; | ||
|
||
/// Removes a process queue. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `mq` - The message queue for which the process queue should be removed. | ||
async fn remove_process_queue(&mut self, mq: &MessageQueue); | ||
|
||
/// Unlocks a message queue. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `mq` - The message queue to be unlocked. | ||
/// * `oneway` - A boolean indicating if the unlock should be one-way. | ||
async fn unlock(&mut self, mq: &MessageQueue, oneway: bool); | ||
|
||
/// Locks all message queues. | ||
fn lock_all(&self); | ||
|
||
/// Unlocks all message queues. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `oneway` - A boolean indicating if the unlock should be one-way. | ||
fn unlock_all(&self, oneway: bool); | ||
|
||
/// Performs rebalancing. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `is_order` - A boolean indicating if the rebalancing is ordered. | ||
/// | ||
/// # Returns | ||
/// | ||
/// A boolean indicating if the rebalancing was successful. | ||
async fn do_rebalance(&mut self, is_order: bool) -> bool; | ||
|
||
/// Performs client-side rebalancing. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `topic` - The topic for which the rebalancing should be performed. | ||
/// | ||
/// # Returns | ||
/// | ||
/// A boolean indicating if the client-side rebalancing was successful. | ||
fn client_rebalance(&mut self, topic: &str) -> bool; | ||
|
||
/// Destroys the rebalancer. | ||
fn destroy(&mut self); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add Error Handling in
execute_pop_request_immediately
The method
execute_pop_request_immediately
forwards thepop_request
to thepull_message_service
without handling potential errors. Ensure that any errors returned byexecute_pop_pull_request_immediately
are appropriately caught and managed, possibly with logging or retry logic.