-
Notifications
You must be signed in to change notification settings - Fork 159
[ISSUE #3461]🚀Refactor shutdown methods to be asynchronous for improved performance and responsiveness✨ #3462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ed performance and responsiveness✨
WalkthroughThe changes refactor the shutdown methods for transactional message services in the RocketMQ broker to be asynchronous. Method signatures and implementations are updated to support async/await, ensuring that shutdown operations for transactional components are properly awaited and executed asynchronously. Changes
Sequence Diagram(s)sequenceDiagram
participant BrokerRuntime
participant TransactionalMessageService
participant TransactionalOpBatchService
BrokerRuntime->>TransactionalMessageService: shutdown().await
TransactionalMessageService->>TransactionalMessageService: close().await
alt If batch service present
TransactionalMessageService->>TransactionalOpBatchService: shutdown().await
TransactionalOpBatchService->>TransactionalOpBatchService: service_manager.shutdown().await
end
Assessment against linked issues
Assessment against linked issues: Out-of-scope changesNo out-of-scope changes found. Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Refactor shutdown methods to be asynchronous in broker services for improved performance and responsiveness.
- Change
close()
inTransactionalMessageServiceLocal
to an async method. - Add
shutdown()
as an async method inTransactionalOpBatchService
and implement it inDefaultTransactionalMessageService
. - Update broker startup/shutdown logic in
BrokerRuntime
to await the new async shutdown methods.
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
rocketmq-broker/src/transaction/transactional_message_service.rs | Converted close() to async fn close() in the service trait |
rocketmq-broker/src/transaction/queue/transactional_op_batch_service.rs | Added pub async fn shutdown() calling service_manager.shutdown() |
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs | Implemented async shutdown() and async fn close() to await batch service shutdown |
rocketmq-broker/src/broker_runtime.rs | Updated broker shutdown to await transactional_message_service.shutdown() |
Comments suppressed due to low confidence (2)
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs:226
- The trait defines an async close() method but the implementation exposes shutdown(); consider unifying method names (either rename close to shutdown in the trait or vice versa) for a consistent API.
pub async fn shutdown(&mut self) {
rocketmq-broker/src/transaction/transactional_message_service.rs:111
- Rust traits do not support async functions directly; you should annotate this trait with #[async_trait] or change close to return a boxed Future to ensure compatibility.
async fn close(&self);
@@ -59,6 +59,10 @@ where | |||
self.service_manager.start().await.unwrap(); | |||
} | |||
|
|||
pub async fn shutdown(&self) { | |||
self.service_manager.shutdown().await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Using unwrap() on a shutdown result can cause a panic; consider handling errors gracefully or logging failures instead of unwrapping.
self.service_manager.shutdown().await.unwrap(); | |
if let Err(e) = self.service_manager.shutdown().await { | |
warn!("Failed to shut down service manager: {:?}", e); | |
} |
Copilot uses AI. Check for mistakes.
@@ -317,7 +317,7 @@ impl BrokerRuntime { | |||
if let Some(transactional_message_service) = | |||
self.inner.transactional_message_service.as_mut() | |||
{ | |||
transactional_message_service.shutdown(); | |||
transactional_message_service.shutdown().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Ignoring potential errors from shutdown() might mask failures; consider handling or logging errors returned by shutdown() instead of silently awaiting.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs (1)
226-228
: Redundant publicshutdown
wrapper—consider consolidating with trait‐levelclose()
shutdown()
is only delegating toself.close().await
, so the repo now exposes two names for the same concern (close
andshutdown
). This increases the cognitive load for callers and makes the public surface inconsistent with theTransactionalMessageService
trait, which already definesclose()
.If an explicit mutable‐borrow variant is needed (e.g. to
take()
the option – see next comment), consider:
- Moving the resource‐cleanup logic into
shutdown(&mut self)
and droppingclose()
from the trait, or- Keeping only
close()
and having external code call it directly.Either way keeps a single authoritative shutdown entry-point.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
rocketmq-broker/src/broker_runtime.rs
(1 hunks)rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs
(2 hunks)rocketmq-broker/src/transaction/queue/transactional_op_batch_service.rs
(1 hunks)rocketmq-broker/src/transaction/transactional_message_service.rs
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
rocketmq-broker/src/transaction/transactional_message_service.rs (1)
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs (1)
close
(1128-1132)
rocketmq-broker/src/transaction/queue/transactional_op_batch_service.rs (2)
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs (1)
shutdown
(226-228)rocketmq-broker/src/broker_runtime.rs (1)
shutdown
(251-278)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: auto-approve
- GitHub Check: test
- GitHub Check: build
🔇 Additional comments (1)
rocketmq-broker/src/transaction/transactional_message_service.rs (1)
110-112
: Trait change is breaking – ensure all implementors are updated & re-exported
close
is nowasync
, which alters the autogeneratedtrait_variant
surface.
Double-check that every downstream implementation has the new signature and that
public re-exports (pub use …
) are updated to avoid semver regressions.
pub async fn shutdown(&self) { | ||
self.service_manager.shutdown().await.unwrap(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid panicking during shutdown – handle the Result
instead of unwrapping
A failure during shutdown should not bring the whole broker down.
Replacing unwrap()
with graceful error handling (log & return) will keep the node responsive even if the inner service fails.
- pub async fn shutdown(&self) {
- self.service_manager.shutdown().await.unwrap();
+ pub async fn shutdown(&self) {
+ if let Err(e) = self.service_manager.shutdown().await {
+ tracing::error!("TransactionalOpBatchService shutdown failed: {e:?}");
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub async fn shutdown(&self) { | |
self.service_manager.shutdown().await.unwrap(); | |
} | |
pub async fn shutdown(&self) { | |
if let Err(e) = self.service_manager.shutdown().await { | |
tracing::error!("TransactionalOpBatchService shutdown failed: {e:?}"); | |
} | |
} |
🤖 Prompt for AI Agents
In rocketmq-broker/src/transaction/queue/transactional_op_batch_service.rs at
lines 62 to 64, the shutdown method currently uses unwrap() on the result of
service_manager.shutdown(), which can cause a panic if shutdown fails. Replace
unwrap() with proper error handling by matching on the Result, logging any error
encountered, and returning gracefully without panicking to ensure the broker
remains responsive even if shutdown encounters issues.
if let Some(transactional_message_service) = | ||
self.inner.transactional_message_service.as_mut() | ||
{ | ||
transactional_message_service.shutdown(); | ||
transactional_message_service.shutdown().await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Hold-and-await on a &mut
borrow risks future borrow-checker pain
self.inner.transactional_message_service.as_mut()
yields a mutable borrow that is
held across the .await
.
While this compiles today, it prevents any code inside shutdown()
from
re-borrowing self.inner
(directly or indirectly) and makes subsequent
refactors harder.
Consider taking ownership (or a clone) first and releasing the borrow before
awaiting:
if let Some(service) = self.inner.transactional_message_service.take() {
service.shutdown().await;
}
This pattern also guarantees the field is None
afterwards, so shutdown is
performed exactly once.
🤖 Prompt for AI Agents
In rocketmq-broker/src/broker_runtime.rs around lines 317 to 321, the code holds
a mutable borrow of self.inner.transactional_message_service across an await,
which can cause borrow-checker issues. To fix this, take ownership of the
transactional_message_service by using .take() to replace the Option with None
before awaiting shutdown, ensuring the mutable borrow is released before the
await and that shutdown is called exactly once.
async fn close(&self) { | ||
if let Some(batch_service) = &self.transactional_op_batch_service { | ||
batch_service.shutdown().await | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
transactional_op_batch_service
is never dropped – potential resource leak
close(&self)
calls batch_service.shutdown().await
but retains the Some(...)
value inside the Option
, keeping any internal state and allocations alive for the remainder of the broker’s lifetime.
Because close
only borrows &self
, it cannot take()
the value.
A minimal, non-breaking refactor is to move the ownership-dropping logic into the shutdown(&mut self)
wrapper where a mutable borrow already exists:
-pub async fn shutdown(&mut self) {
- self.close().await
+pub async fn shutdown(&mut self) {
+ if let Some(batch_service) = self.transactional_op_batch_service.take() {
+ batch_service.shutdown().await;
+ }
}
This:
• Ensures the service is dropped after its async shutdown completes.
• Prevents accidental double-shutdowns.
• Frees memory and wakes any Weak
references earlier.
If you keep both entry-points, make close()
simply call the new shutdown(&mut self)
via async { … }
to centralise the logic.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In
rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs
around lines 1128 to 1132, the close(&self) method calls shutdown on
transactional_op_batch_service but does not drop it, causing a potential
resource leak. Refactor by moving the shutdown and dropping logic into a new
shutdown(&mut self) method where you have mutable access, allowing you to take()
the Option and drop it after awaiting shutdown. Then, have close() call this
shutdown(&mut self) asynchronously to centralize the shutdown and dropping
logic, ensuring the service is properly dropped and resources freed.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3462 +/- ##
==========================================
- Coverage 26.47% 26.47% -0.01%
==========================================
Files 545 545
Lines 77749 77754 +5
==========================================
Hits 20583 20583
- Misses 57166 57171 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #3461
Brief Description
How Did You Test This Change?
Summary by CodeRabbit