-
Notifications
You must be signed in to change notification settings - Fork 159
[ISSUE #3481]⚡️Enhancement DefaultHAConnection to simplify socket stream handling and improve readability✨ #3482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…eam handling and improve readability✨
WalkthroughThe changes refactor the socket handling in Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant DefaultHAConnection
participant ReadSocketService
participant WriteSocketService
Client->>DefaultHAConnection: new(TcpStream)
DefaultHAConnection->>DefaultHAConnection: split TcpStream into OwnedReadHalf/OwnedWriteHalf
DefaultHAConnection->>ReadSocketService: pass OwnedReadHalf
DefaultHAConnection->>WriteSocketService: pass OwnedWriteHalf
ReadSocketService->>ReadSocketService: read loop (no locking)
WriteSocketService->>WriteSocketService: write loop (no locking)
Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the DefaultHAConnection by simplifying socket stream handling and improving readability, as well as adding a new placeholder method to the DefaultHAService interface.
- Replace shared RwLock-wrapped TcpStream with an Option and split it into OwnedReadHalf/OwnedWriteHalf for read and write services
- Add a new async method notify_transfer_some in DefaultHAService with an explicit unimplemented message
- Update the socket stream handling in both ReadSocketService and WriteSocketService to pass ownership via Option
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
rocketmq-store/src/ha/default_ha_service.rs | Added a more descriptive unimplemented message and a new notify_transfer_some async method |
rocketmq-store/src/ha/default_ha_connection.rs | Refactored socket_stream handling by removing RwLock, using Option, and updating service creation with OwnedReadHalf/OwnedWriteHalf |
@@ -104,9 +107,12 @@ impl DefaultHAConnection { | |||
// Start flow monitor | |||
self.flow_monitor.start().await; | |||
|
|||
let socket_stream = self.socket_stream.take().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using unwrap() here can lead to a panic if start() is called more than once or if socket_stream is unexpectedly None. It is recommended to handle this case with proper error handling.
let socket_stream = self.socket_stream.take().unwrap(); | |
let socket_stream = match self.socket_stream.take() { | |
Some(stream) => stream, | |
None => { | |
return Err(HAConnectionError::SocketStreamUnavailable( | |
"Socket stream is already taken or not initialized".to_string(), | |
)); | |
} | |
}; |
Copilot uses AI. Check for mistakes.
/*let mut socket_guard = self.socket_stream.write().await; | ||
if let Some(mut socket) = socket_guard.take() { | ||
if let Err(e) = socket.shutdown().await { | ||
error!("Error closing socket: {}", e); | ||
} | ||
} | ||
}*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The socket closing logic is commented out, which could lead to resource leaks if the connection is not properly closed. It is recommended to either remove the commented-out code or implement the socket shutdown logic.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
rocketmq-store/src/ha/default_ha_connection.rs (1)
352-353
: Call into unimplementednotify_transfer_some()
This awaits a function that panics; the read task will terminate immediately.
Defer the call behind a feature flag or finish the implementation before merging.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-store/src/ha/default_ha_connection.rs
(20 hunks)rocketmq-store/src/ha/default_ha_service.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: auto-approve
- GitHub Check: build
- GitHub Check: test
unimplemented!(" get_default_message_store method is not implemented"); | ||
} | ||
|
||
pub async fn notify_transfer_some(&self, _offset: i64) { | ||
// This method is a placeholder for notifying transfer operations. | ||
// The actual implementation would depend on the specific requirements of the HA service. | ||
unimplemented!(" notify_transfer_some method is not implemented"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unimplemented methods are already invoked – will panic at runtime
notify_transfer_some()
and get_default_message_store()
are called by the connection logic added in this PR, but both are still unimplemented!()
.
The first read from a slave will therefore abort the whole HA connection with a panic.
Either wire these methods up now or gate their callers behind a feature-flag / TODO path so the new code can run without crashing.
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_service.rs around lines 58 to 65, the
methods notify_transfer_some() and get_default_message_store() are currently
unimplemented but are called by the connection logic, causing runtime panics. To
fix this, either implement the actual logic for these methods now or modify the
callers to conditionally invoke these methods only when a feature flag is
enabled or during a TODO phase, preventing the panic and allowing the code to
run safely without crashing.
let socket_stream = self.socket_stream.take().unwrap(); | ||
let (reader, writer) = socket_stream.into_split(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
unwrap()
introduces start-time panic on second call
self.socket_stream.take().unwrap()
will panic if start()
is invoked twice (or after a previous failed start).
Return a proper error instead of unwrapping.
-let socket_stream = self.socket_stream.take().unwrap();
+let socket_stream = self
+ .socket_stream
+ .take()
+ .ok_or_else(|| HAConnectionError::Connection("socket already taken".into()))?;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let socket_stream = self.socket_stream.take().unwrap(); | |
let (reader, writer) = socket_stream.into_split(); | |
let socket_stream = self | |
.socket_stream | |
.take() | |
.ok_or_else(|| HAConnectionError::Connection("socket already taken".into()))?; | |
let (reader, writer) = socket_stream.into_split(); |
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs around lines 110 to 112,
replace the use of unwrap() on self.socket_stream.take() with proper error
handling to avoid panics if start() is called multiple times or after a failure.
Check if socket_stream is Some before taking it, and return an appropriate error
if it is None instead of unwrapping.
if buffer.has_remaining_mut() { | ||
buffer.clear(); | ||
process_position = 0; | ||
} | ||
match timeout( | ||
Duration::from_secs(1), | ||
socket.read(&mut buffer[process_position..]), | ||
) | ||
.await | ||
{ | ||
Ok(Ok(bytes_read)) => { | ||
if bytes_read > 0 { | ||
last_read_timestamp = Instant::now(); | ||
// Process the read data | ||
if buffer.len() - process_position >= REPORT_HEADER_SIZE { | ||
//In general, pos is equal to buffer.len(), mainly to handle | ||
// the alignment issue of the buffer. | ||
let pos = buffer.len() - (buffer.len() % REPORT_HEADER_SIZE); | ||
let read_offset = i64::from_be_bytes([ | ||
buffer[pos - 8], | ||
buffer[pos - 7], | ||
buffer[pos - 6], | ||
buffer[pos - 5], | ||
buffer[pos - 4], | ||
buffer[pos - 3], | ||
buffer[pos - 2], | ||
buffer[pos - 1], | ||
]); | ||
process_position = pos; | ||
slave_ack_offset.store(read_offset, Ordering::SeqCst); | ||
if slave_request_offset.load(Ordering::SeqCst) < 0 { | ||
slave_request_offset.store(read_offset, Ordering::SeqCst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compile-time blocker: invalid buffer handling
Problems inside the read loop:
has_remaining_mut()
does not exist forVec<u8>
– compilation fails.- Calling
buffer.clear()
shrinkslen
to 0, yet the subsequent slicebuffer[process_position..]
assumes a non-zero length → panic if it compiled. - Re-parsing logic relies on
buffer.len()
after it was just cleared.
Consider replacing the whole buffer with bytes::BytesMut
and use its API, or keep a fixed-size Vec
without clearing len
.
Quick minimal fix:
- if buffer.has_remaining_mut() {
- buffer.clear();
- process_position = 0;
- }
+if process_position >= buffer.len() - REPORT_HEADER_SIZE {
+ process_position = 0;
+}
…but switching to BytesMut
is cleaner.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if buffer.has_remaining_mut() { | |
buffer.clear(); | |
process_position = 0; | |
} | |
match timeout( | |
Duration::from_secs(1), | |
socket.read(&mut buffer[process_position..]), | |
) | |
.await | |
{ | |
Ok(Ok(bytes_read)) => { | |
if bytes_read > 0 { | |
last_read_timestamp = Instant::now(); | |
// Process the read data | |
if buffer.len() - process_position >= REPORT_HEADER_SIZE { | |
//In general, pos is equal to buffer.len(), mainly to handle | |
// the alignment issue of the buffer. | |
let pos = buffer.len() - (buffer.len() % REPORT_HEADER_SIZE); | |
let read_offset = i64::from_be_bytes([ | |
buffer[pos - 8], | |
buffer[pos - 7], | |
buffer[pos - 6], | |
buffer[pos - 5], | |
buffer[pos - 4], | |
buffer[pos - 3], | |
buffer[pos - 2], | |
buffer[pos - 1], | |
]); | |
process_position = pos; | |
slave_ack_offset.store(read_offset, Ordering::SeqCst); | |
if slave_request_offset.load(Ordering::SeqCst) < 0 { | |
slave_request_offset.store(read_offset, Ordering::SeqCst); | |
// Minimal fix: reset position instead of clearing the Vec | |
- if buffer.has_remaining_mut() { | |
- buffer.clear(); | |
- process_position = 0; | |
- } | |
+ if process_position >= buffer.len() - REPORT_HEADER_SIZE { | |
+ process_position = 0; | |
+ } | |
match timeout( | |
Duration::from_secs(1), | |
socket.read(&mut buffer[process_position..]), | |
) | |
.await | |
{ | |
Ok(Ok(bytes_read)) => { | |
if bytes_read > 0 { | |
last_read_timestamp = Instant::now(); | |
// Process the read data | |
if buffer.len() - process_position >= REPORT_HEADER_SIZE { | |
//In general, pos is equal to buffer.len(), mainly to handle | |
// the alignment issue of the buffer. | |
let pos = buffer.len() - (buffer.len() % REPORT_HEADER_SIZE); | |
let read_offset = i64::from_be_bytes([ | |
buffer[pos - 8], | |
buffer[pos - 7], | |
buffer[pos - 6], | |
buffer[pos - 5], | |
buffer[pos - 4], | |
buffer[pos - 3], | |
buffer[pos - 2], | |
buffer[pos - 1], | |
]); | |
process_position = pos; | |
slave_ack_offset.store(read_offset, Ordering::SeqCst); | |
if slave_request_offset.load(Ordering::SeqCst) < 0 { | |
slave_request_offset.store(read_offset, Ordering::SeqCst); |
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs around lines 315 to 346, the
code incorrectly uses has_remaining_mut() on a Vec<u8> which does not exist,
clears the buffer causing its length to become zero, and then slices it assuming
a non-zero length, leading to potential panics and compilation errors. To fix
this, replace the Vec<u8> buffer with a bytes::BytesMut buffer to properly
manage capacity and length, use BytesMut's API for checking remaining capacity
and clearing, and adjust the slicing and parsing logic accordingly to avoid
invalid indexing after clearing.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3482 +/- ##
==========================================
- Coverage 26.40% 26.35% -0.06%
==========================================
Files 547 547
Lines 78063 77900 -163
==========================================
- Hits 20616 20532 -84
+ Misses 57447 57368 -79 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #3481
Brief Description
How Did You Test This Change?
Summary by CodeRabbit