-
Notifications
You must be signed in to change notification settings - Fork 159
[ISSUE #3475]🚀Implement DefaultHAConnection✨ #3477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis change introduces a new high-availability connection mechanism ( Changes
Sequence Diagram(s)sequenceDiagram
participant Master as DefaultHAConnection (Master)
participant Slave as TCP Socket (Slave)
participant ReadSvc as ReadSocketService
participant WriteSvc as WriteSocketService
participant Flow as FlowMonitor
Master->>ReadSvc: Start read service (spawn task)
Master->>WriteSvc: Start write service (spawn task)
loop While connection active
Slave-->>ReadSvc: Send slave offset report
ReadSvc->>Master: Update slave_ack_offset
WriteSvc->>Slave: Send data batch or heartbeat header
WriteSvc->>Flow: Update transferred byte count
alt Heartbeat interval passed
WriteSvc->>Slave: Send heartbeat header
end
end
Master->>ReadSvc: Shutdown (abort task)
Master->>WriteSvc: Shutdown (abort task)
Master->>Slave: Close socket
Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request implements DefaultHAConnection and enhances HA service functionality. Key changes include:
- Adding an AsRef implementation for ServiceManager to expose its inner service.
- Introducing new methods in FlowMonitor to delegate byte transfer operations via server_manager.
- Adding a placeholder get_default_message_store function and updating HA-related configuration defaults.
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
rocketmq/src/task/service_task.rs | Added AsRef implementation for ServiceManager |
rocketmq-store/src/ha/flow_monitor.rs | Added methods to retrieve and manipulate transfer byte metrics |
rocketmq-store/src/ha/default_ha_service.rs | Introduced placeholder for get_default_message_store |
rocketmq-store/src/ha.rs | Registered the new default_ha_connection module |
rocketmq-store/src/config/message_store_config.rs | Updated configuration defaults for HA intervals |
Comments suppressed due to low confidence (1)
rocketmq-store/src/config/message_store_config.rs:275
- [nitpick] Consider adding a comment clarifying that '1000 * 20' represents a time interval in milliseconds to improve code readability for future maintainers.
1000 * 20
// Add any necessary fields here | ||
|
||
pub fn get_default_message_store(&self) -> &LocalFileMessageStore { | ||
unimplemented!() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of unimplemented!() will cause a runtime panic if this method is ever called. Consider adding documentation to indicate its intended future implementation or provide a safe fallback.
unimplemented!() | |
panic!("get_default_message_store is not implemented. This method should return a reference to a LocalFileMessageStore. Please implement this method before using it."); |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (4)
rocketmq-store/src/config/message_store_config.rs (1)
274-279
: Useu64
-suffixed literals for clarity and to avoid implicit type coercionThe new helpers return
u64
, but the literal arithmetic (1000 * 20
,1000 * 5
) is still inferred from unsuffixedi32
values and then widened.
A one-time coercion is legal, yet adding the suffix makes the intent obvious and prevents accidental narrowing if these constants are copied elsewhere.- pub fn ha_housekeeping_interval() -> u64 { - 1000 * 20 - } - pub fn ha_send_heartbeat_interval() -> u64 { - 1000 * 5 - } + pub fn ha_housekeeping_interval() -> u64 { + 20_000u64 // 20 s + } + pub fn ha_send_heartbeat_interval() -> u64 { + 5_000u64 // 5 s + }rocketmq/src/task/service_task.rs (1)
159-163
: Return the innerT
directly to avoid an extra indirection
&self.service
gives a&Arc<T>
; the compiler currently performs a double‐deref to convert this to&T
, but being explicit avoids confusion and one needlessDeref
hop.- fn as_ref(&self) -> &T { - &self.service + fn as_ref(&self) -> &T { + &*self.service }rocketmq-store/src/ha/default_ha_service.rs (1)
1-32
: Duplicate ASF licence header – please keep a single copyTwo identical licence blocks increase file length and risk contradicting future edits.
Remove the second one (lines 17-32) or keep just a short SPDX line.rocketmq-store/src/ha/default_ha_connection.rs (1)
318-343
: Offset parsing discards preceding framesAfter reading ≥ 8 bytes you always parse the last 8 bytes and
process_position = 0
, silently dropping any earlier pending frames. If the slave batches multiple reports in one TCP packet only the final one is considered.Refactor to consume headers in a loop while
process_position >= REPORT_HEADER_SIZE
, or adoptBytesMut
/Buf
for framed decoding.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
rocketmq-store/src/config/message_store_config.rs
(3 hunks)rocketmq-store/src/ha.rs
(1 hunks)rocketmq-store/src/ha/default_ha_connection.rs
(1 hunks)rocketmq-store/src/ha/default_ha_service.rs
(1 hunks)rocketmq-store/src/ha/flow_monitor.rs
(1 hunks)rocketmq/src/task/service_task.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build
- GitHub Check: test
- GitHub Check: auto-approve
🔇 Additional comments (4)
rocketmq-store/src/config/message_store_config.rs (1)
516-521
: Serde default functions added – verify all external configuratorsThe new
ha_send_heartbeat_interval
andha_housekeeping_interval
fields changed type fromusize
tou64
.
If you have any TOML/YAML/JSON config files or CLI flags that still supply these values as"12345"
(implicitlyusize
), double-check the downstream deserialisation to avoid a silent break at runtime.
No code change required – just a heads-up.rocketmq-store/src/ha.rs (1)
18-18
: Module inclusion looks good
mod default_ha_connection;
is correctly added and kept private to theha
crate module – no issues spotted.rocketmq-store/src/ha/flow_monitor.rs (1)
47-60
: Public delegation methods – concise and thread-safeThe new accessors cleanly expose the inner counters and rely on atomics, preserving thread safety. Nice addition.
rocketmq-store/src/ha/default_ha_connection.rs (1)
217-253
: Read service’s private channel is also unused
ReadSocketService::new
creates its ownshutdown_sender
, but nothing ever sends or receives on this channel. Remove the field or wire it into a real shutdown signal; otherwise the extra allocation / field is wasted and confuses future maintainers.
// Add any necessary fields here | ||
|
||
pub fn get_default_message_store(&self) -> &LocalFileMessageStore { | ||
unimplemented!() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
get_default_message_store
cannot be implemented on an empty struct
The method promises &LocalFileMessageStore
but DefaultHAService
holds no such field, so a safe reference can never be returned.
Either:
- Store an
Arc<LocalFileMessageStore>
inside the struct and return&self.message_store
, or - Change the signature to return an
Arc<LocalFileMessageStore>
(or similar ownership).
Leaving it as unimplemented!()
will panic at first call and masks the real design gap.
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_service.rs around lines 55 to 60, the method
get_default_message_store returns a reference to LocalFileMessageStore but the
struct DefaultHAService has no such field, making it impossible to return a safe
reference. To fix this, add a field like Arc<LocalFileMessageStore> to
DefaultHAService and return a reference to it, or change the method signature to
return an Arc<LocalFileMessageStore> and return the owned value. Remove the
unimplemented!() placeholder to avoid runtime panics.
let mut socket_guard = socket_stream.write().await; | ||
if let Some(ref mut socket) = socket_guard.as_mut() { | ||
match timeout( | ||
Duration::from_secs(1), | ||
socket.read(&mut buffer[process_position..]), | ||
) | ||
.await | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Heavy contention on a single RwLock
around the socket
Both read- and write-tasks grab the write side of the same RwLock<Option<TcpStream>>
for every I/O call. This serialises all reads and writes (no parallelism, unnecessary context-switching) and may dead-lock if either task stalls while holding the lock.
A simpler & faster pattern:
let (mut reader, mut writer) = socket.into_split(); // returns independent halves
Store each half behind its own Mutex
(or none at all) so reads and writes progress concurrently without blocking each other.
Also applies to: 577-591
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs around lines 305 to 312, the
code uses a single RwLock<Option<TcpStream>> for both reading and writing,
causing heavy contention and potential deadlocks. Refactor by splitting the
TcpStream into separate read and write halves using the into_split() method,
then store each half behind its own Mutex or no lock if safe, allowing
concurrent reads and writes without blocking each other. Apply the same pattern
to lines 577 to 591.
// ha_service.increment_connection_count(); | ||
|
||
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); | ||
|
||
Ok(Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown_receiver
is created but never used – remove or utilise it
let (shutdown_sender, shutdown_receiver) = …
immediately discards shutdown_receiver
, triggering an “unused variable” warning and signalling a design hole (the connection has no central shutdown listener).
Either:
-let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);
+let (shutdown_sender, _shutdown_receiver) = mpsc::channel(1); // silence warning
or keep the receiver and drive coordinated shutdown through it.
Leaving dead code lowers maintainability and may break #![deny(warnings]
builds.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// ha_service.increment_connection_count(); | |
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); | |
Ok(Self { | |
// ha_service.increment_connection_count(); | |
let (shutdown_sender, _shutdown_receiver) = mpsc::channel(1); // silence warning | |
Ok(Self { |
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs around lines 81 to 85, the
variable shutdown_receiver is created but never used, causing an unused variable
warning. To fix this, either remove the shutdown_receiver if it is unnecessary,
or retain it and implement logic to listen for shutdown signals through it to
enable coordinated shutdown. Ensure the code uses shutdown_receiver properly or
eliminates it to avoid dead code and warnings.
if let Some(data) = MessageStore::get_commit_log_data( | ||
ha_service.get_default_message_store(), | ||
next_transfer_from_where.load(Ordering::SeqCst), | ||
) { | ||
//let mut size = data.len(); | ||
let mut size = 0_i32; | ||
let max_batch_size = message_store_config.ha_transfer_batch_size as i32; | ||
if size > max_batch_size { | ||
size = max_batch_size; | ||
} | ||
|
||
let can_transfer_max = flow_monitor.can_transfer_max_byte_num(); | ||
if size > can_transfer_max { | ||
size = can_transfer_max; | ||
} | ||
|
||
let this_offset = next_transfer_from_where.load(Ordering::SeqCst); | ||
next_transfer_from_where.store(this_offset + size as i64, Ordering::SeqCst); | ||
|
||
let header = Self::build_header(this_offset, size); | ||
|
||
// Transfer header and data | ||
if Self::transfer_header(&socket_stream, &header, &flow_monitor).await { | ||
/*last_write_over = | ||
Self::transfer_data(&socket_stream, &data[..size as usize], &flow_monitor).await;*/ | ||
last_write_timestamp = Instant::now(); | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data never transferred – core lines commented out
size
is forcibly set to 0
, transfer_data
is commented, so the write side only pushes heart-beats. Slaves will never receive commit-log bodies.
-// let mut size = data.len();
-let mut size = 0_i32;
+let mut size = data.len() as i32;
...
-/*last_write_over =
- Self::transfer_data(&socket_stream, &data[..size as usize], &flow_monitor).await;*/
+last_write_over =
+ Self::transfer_data(&socket_stream, &data[..size as usize], &flow_monitor).await;
Without this fix HA replication is functionally broken.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if let Some(data) = MessageStore::get_commit_log_data( | |
ha_service.get_default_message_store(), | |
next_transfer_from_where.load(Ordering::SeqCst), | |
) { | |
//let mut size = data.len(); | |
let mut size = 0_i32; | |
let max_batch_size = message_store_config.ha_transfer_batch_size as i32; | |
if size > max_batch_size { | |
size = max_batch_size; | |
} | |
let can_transfer_max = flow_monitor.can_transfer_max_byte_num(); | |
if size > can_transfer_max { | |
size = can_transfer_max; | |
} | |
let this_offset = next_transfer_from_where.load(Ordering::SeqCst); | |
next_transfer_from_where.store(this_offset + size as i64, Ordering::SeqCst); | |
let header = Self::build_header(this_offset, size); | |
// Transfer header and data | |
if Self::transfer_header(&socket_stream, &header, &flow_monitor).await { | |
/*last_write_over = | |
Self::transfer_data(&socket_stream, &data[..size as usize], &flow_monitor).await;*/ | |
last_write_timestamp = Instant::now(); | |
} | |
} else { | |
if let Some(data) = MessageStore::get_commit_log_data( | |
ha_service.get_default_message_store(), | |
next_transfer_from_where.load(Ordering::SeqCst), | |
) { | |
let mut size = data.len() as i32; | |
let max_batch_size = message_store_config.ha_transfer_batch_size as i32; | |
if size > max_batch_size { | |
size = max_batch_size; | |
} | |
let can_transfer_max = flow_monitor.can_transfer_max_byte_num(); | |
if size > can_transfer_max { | |
size = can_transfer_max; | |
} | |
let this_offset = next_transfer_from_where.load(Ordering::SeqCst); | |
next_transfer_from_where.store(this_offset + size as i64, Ordering::SeqCst); | |
let header = Self::build_header(this_offset, size); | |
// Transfer header and data | |
if Self::transfer_header(&socket_stream, &header, &flow_monitor).await { | |
last_write_over = | |
Self::transfer_data(&socket_stream, &data[..size as usize], &flow_monitor).await; | |
last_write_timestamp = Instant::now(); | |
} | |
} else { |
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs between lines 529 and 556, the
variable 'size' is incorrectly set to 0, and the call to transfer_data is
commented out, preventing actual commit-log data from being transferred and
causing HA replication to fail. To fix this, set 'size' to the length of the
data retrieved from get_commit_log_data, ensure it is capped by max_batch_size
and can_transfer_max_byte_num as before, and uncomment and enable the call to
transfer_data to send the commit-log data after transferring the header.
if next_transfer_from_where.load(Ordering::SeqCst) == -1 { | ||
let slave_offset = slave_request_offset.load(Ordering::SeqCst); | ||
let transfer_offset = if slave_offset == 0 { | ||
/*let master_offset = ha_service | ||
.get_message_store()::<MS>() | ||
.get_commit_log() | ||
.get_max_offset();*/ | ||
let master_offset = 0; | ||
let mapped_file_size = message_store_config.mapped_file_size_commit_log; | ||
let aligned_offset = master_offset - (master_offset % mapped_file_size as i64); | ||
if aligned_offset < 0 { | ||
0 | ||
} else { | ||
aligned_offset | ||
} | ||
} else { | ||
slave_offset | ||
}; | ||
|
||
next_transfer_from_where.store(transfer_offset, Ordering::SeqCst); | ||
info!( | ||
"master transfer data from {} to slave[{}], and slave request {}", | ||
transfer_offset, client_address, slave_offset | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Placeholder master_offset = 0
breaks initial offset calculation
Replication will always start at offset 0, ignoring the real commit-log tail. Replace the placeholder with the actual value:
- let master_offset = 0;
+ let master_offset = ha_service
+ .get_default_message_store()
+ .commit_log_max_offset(); // adjust to real API
Failing to do so may resend gigabytes of data unnecessarily or corrupt offsets on slaves.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if next_transfer_from_where.load(Ordering::SeqCst) == -1 { | |
let slave_offset = slave_request_offset.load(Ordering::SeqCst); | |
let transfer_offset = if slave_offset == 0 { | |
/*let master_offset = ha_service | |
.get_message_store()::<MS>() | |
.get_commit_log() | |
.get_max_offset();*/ | |
let master_offset = 0; | |
let mapped_file_size = message_store_config.mapped_file_size_commit_log; | |
let aligned_offset = master_offset - (master_offset % mapped_file_size as i64); | |
if aligned_offset < 0 { | |
0 | |
} else { | |
aligned_offset | |
} | |
} else { | |
slave_offset | |
}; | |
next_transfer_from_where.store(transfer_offset, Ordering::SeqCst); | |
info!( | |
"master transfer data from {} to slave[{}], and slave request {}", | |
transfer_offset, client_address, slave_offset | |
); | |
} | |
if next_transfer_from_where.load(Ordering::SeqCst) == -1 { | |
let slave_offset = slave_request_offset.load(Ordering::SeqCst); | |
let transfer_offset = if slave_offset == 0 { | |
- let master_offset = 0; | |
+ // Fetch the real tail offset from the master’s commit log | |
+ let master_offset = ha_service | |
+ .get_default_message_store() | |
+ .commit_log_max_offset(); // adjust to real API | |
let mapped_file_size = message_store_config.mapped_file_size_commit_log; | |
let aligned_offset = master_offset - (master_offset % mapped_file_size as i64); | |
if aligned_offset < 0 { | |
0 | |
} else { | |
aligned_offset | |
} | |
} else { | |
slave_offset | |
}; | |
next_transfer_from_where.store(transfer_offset, Ordering::SeqCst); | |
info!( | |
"master transfer data from {} to slave[{}], and slave request {}", | |
transfer_offset, client_address, slave_offset | |
); | |
} |
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs between lines 484 and 508, the
code currently sets master_offset to 0 as a placeholder, which causes
replication to always start at offset 0 and ignores the actual commit-log tail.
To fix this, replace the placeholder with a call to retrieve the real maximum
offset from the commit log via
ha_service.get_message_store().get_commit_log().get_max_offset(). This ensures
the initial offset calculation uses the correct commit-log tail, preventing
unnecessary data resend or offset corruption on slaves.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3477 +/- ##
==========================================
- Coverage 26.60% 26.47% -0.14%
==========================================
Files 546 547 +1
Lines 77978 78382 +404
==========================================
Hits 20749 20749
- Misses 57229 57633 +404 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #3475
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Improvements