-
Notifications
You must be signed in to change notification settings - Fork 159
[ISSUE #3466]♻️Refactor offset handling in default_transactional_message_service.rs for improved clarity and performance🚀 #3467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…age_service.rs for improved clarity and performance🚀
WalkthroughThe changes refactor offset handling in the transactional message service by improving variable naming, updating method signatures, clarifying comments, and correcting offset usage. Additionally, a trait method parameter is renamed for clarity. Parsing logic is hardened to skip empty strings, and offset calculations now consistently use the correct variables. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant DefaultTransactionalMessageService
participant TransactionalMessageCheckListener
Client->>DefaultTransactionalMessageService: process_message_queue(...)
DefaultTransactionalMessageService->>DefaultTransactionalMessageService: Parse and process offsets (half_offset, op_offset)
DefaultTransactionalMessageService->>TransactionalMessageCheckListener: resolve_half_msg(msg_ext)
TransactionalMessageCheckListener-->>DefaultTransactionalMessageService: Result
DefaultTransactionalMessageService-->>Client: Processing complete
Assessment against linked issues
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors offset handling in the transactional message service for improved clarity and performance. Key changes include renaming parameters and indices for clearer intent, updating comments to better reflect the underlying logic, and adjusting offset calculations for both half and op message queues.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
File | Description |
---|---|
rocketmq-broker/src/transaction/transactional_message_check_listener.rs | Renamed a parameter in function documentation for consistency. |
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs | Improved variable naming, refined offset handling logic, and updated comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs (1)
546-556
: 🛠️ Refactor suggestionType mismatch:
next_op_offset
toggles betweenu64
andi64
PullResult::next_begin_offset()
returnsu64
, butfill_op_remove_map
expectsi64
, so every call does anas i64
cast.
The variable itself starts asu64
(line 341) and is later assigned au64
, then cast again.This silent narrowing risks negative values on overflow and is noisy.
Prefer a single signed type throughout:-let mut next_op_offset = pull_result.as_ref().map_or(0, |pr| pr.next_begin_offset()); +let mut next_op_offset: i64 = + pull_result.as_ref().map_or(0, |pr| pr.next_begin_offset() as i64); ... - next_op_offset as i64, + next_op_offset,and drop the other casts.
🧹 Nitpick comments (2)
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs (2)
323-335
:process_message_queue
signature updated but docs notThe extra
op_offset
parameter is added here and at the call-site, 👍.
However, the preceding doc-comment (lines 321-322) still documents only four parameters, so IDEs and generated docs are now stale. Please update the comment to listop_offset
with a short description.
338-341
: Redundant offset bookkeeping increases cognitive load
new_offset
,consume_half_offset
, andhalf_offset
are all in play, plus the newop_offset
.
Maintaining bothnew_offset
andconsume_half_offset
that are advanced in lock-step (see later increments) invites future divergence bugs.A leaner variant keeps a single mutable
consume_half_offset
and writes it toupdate_consume_offset
at the end:-let mut new_offset = half_offset; -let mut consume_half_offset = half_offset; +let mut consume_half_offset = half_offset; // single source of truthand drop
new_offset
everywhere else.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs
(18 hunks)rocketmq-broker/src/transaction/transactional_message_check_listener.rs
(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
rocketmq-broker/src/transaction/transactional_message_check_listener.rs (1)
rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs (1)
resolve_half_msg
(190-198)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: auto-approve
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build
- GitHub Check: test
🔇 Additional comments (2)
rocketmq-broker/src/transaction/transactional_message_check_listener.rs (1)
67-79
: Parameter is now used – consider aligning mutability of the two callbacks
resolve_half_msg
now takesmsg_ext
(instead of_msg_ext
) and is declared with&self
, whereasresolve_discard_msg
is&mut self
.
If implementations of the trait need to mutate internal state in both paths (discard vs. half-resolution), requiring&mut self
in one method but not the other forces implementers to sprinkle interior-mutability or cloning only forresolve_half_msg
.Unless immutability is intentional (and guaranteed safe for all current/future use-cases), consider:
-async fn resolve_half_msg(&self, msg_ext: MessageExt) -> rocketmq_error::RocketMQResult<()>; +async fn resolve_half_msg(&mut self, msg_ext: MessageExt) -> rocketmq_error::RocketMQResult<()>;This keeps the API symmetrical and avoids hidden
Mutex
/RwLock
cost in downstream impls.rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs (1)
593-599
:calculate_op_offset
may skip intermediate done offsetsThe new call compares
new_op_offset
with the originalop_offset
.
Ifdone_op_offset
contained values< op_offset
,calculate_op_offset
returnsop_offset
unchanged, leaving stale consume-offset.
Consider passing the last consumed op offset (next_op_offset
orpull_result.next_begin_offset()
), or augmentcalculate_op_offset
to iterate from the current broker consume-offset instead of the original snapshot.
let max_msg_offset = if let Some(ref pr) = get_result.pull_result { | ||
pr.max_offset() | ||
} else { | ||
new_offset as u64 | ||
}; | ||
let max_op_offset = if let Some(ref pr) = pull_result { | ||
pr.max_offset() | ||
} else { | ||
new_op_offset as u64 | ||
}; | ||
let msg_time = get_result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mixed signed/unsigned arithmetic can panic in debug builds
max_msg_offset
/ max_op_offset
are u64
, while new_offset
/new_op_offset
are i64
.
(max_op_offset as i64) - new_op_offset
will panic in debug if the casted value exceeds i64::MAX
(possible on large stores).
Safer:
let op_diff = max_op_offset.saturating_sub(new_op_offset as u64);
let msg_diff = max_msg_offset.saturating_sub(new_offset as u64);
and log op_diff
, msg_diff
as u64
.
🤖 Prompt for AI Agents
In
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs
around lines 603 to 613, the code mixes signed i64 and unsigned u64 arithmetic
which can cause panics in debug builds when casting large values. To fix this,
convert new_offset and new_op_offset to u64 before subtraction and use
saturating_sub on max_msg_offset and max_op_offset to safely compute differences
without overflow. Then log these differences as u64 values.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3467 +/- ##
==========================================
- Coverage 26.47% 26.47% -0.01%
==========================================
Files 545 545
Lines 77754 77759 +5
==========================================
Hits 20583 20583
- Misses 57171 57176 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #3466
Brief Description
How Did You Test This Change?
Summary by CodeRabbit