Skip to content

[ISSUE #3475]🚀Implement DefaultHAConnection✨ #3477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2025

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Jun 17, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #3475

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced a high-availability (HA) connection mechanism with asynchronous read/write services, heartbeat management, and flow monitoring for improved data transfer reliability.
    • Added new configuration options for HA heartbeat and housekeeping intervals with defined default values.
    • Exposed methods to access transfer metrics and control flow in the HA flow monitor.
  • Improvements

    • Enhanced configuration consistency for HA intervals and improved type safety.
    • Added utility methods for accessing and managing internal service references.

@Copilot Copilot AI review requested due to automatic review settings June 17, 2025 15:38
Copy link
Contributor

coderabbitai bot commented Jun 17, 2025

Walkthrough

This change introduces a new high-availability connection mechanism (DefaultHAConnection) for RocketMQ's message store, including its read and write socket services, flow monitoring, and heartbeat management. Supporting configuration and flow monitor methods were added, and a placeholder for accessing the default message store was introduced. Minor trait implementations and module declarations were also updated.

Changes

File(s) Change Summary
rocketmq-store/src/ha/default_ha_connection.rs New file: Implements DefaultHAConnection, read/write services, error types, and async HA logic.
rocketmq-store/src/ha.rs Declared new module: default_ha_connection.
rocketmq-store/src/ha/flow_monitor.rs Added public methods to FlowMonitor for byte transfer stats and flow control.
rocketmq-store/src/config/message_store_config.rs Added default interval functions; changed config types and defaults for HA intervals.
rocketmq-store/src/ha/default_ha_service.rs Added placeholder method get_default_message_store to DefaultHAService.
rocketmq/src/task/service_task.rs Added AsRef<T> implementation for ServiceManager<T>.

Sequence Diagram(s)

sequenceDiagram
    participant Master as DefaultHAConnection (Master)
    participant Slave as TCP Socket (Slave)
    participant ReadSvc as ReadSocketService
    participant WriteSvc as WriteSocketService
    participant Flow as FlowMonitor

    Master->>ReadSvc: Start read service (spawn task)
    Master->>WriteSvc: Start write service (spawn task)
    loop While connection active
        Slave-->>ReadSvc: Send slave offset report
        ReadSvc->>Master: Update slave_ack_offset
        WriteSvc->>Slave: Send data batch or heartbeat header
        WriteSvc->>Flow: Update transferred byte count
        alt Heartbeat interval passed
            WriteSvc->>Slave: Send heartbeat header
        end
    end
    Master->>ReadSvc: Shutdown (abort task)
    Master->>WriteSvc: Shutdown (abort task)
    Master->>Slave: Close socket
Loading

Assessment against linked issues

Objective Addressed Explanation
Implement DefaultHAConnection with async read/write services, flow monitoring, heartbeat, and shutdown (#3475)
Provide configuration for HA intervals and flow monitor access (#3475)
Expose placeholder for accessing default message store (#3475)

Assessment against linked issues: Out-of-scope changes

Code Change Explanation
Implementation of AsRef<T> for ServiceManager<T> (rocketmq/src/task/service_task.rs) This trait implementation is a general utility and not directly related to the DefaultHAConnection feature.
Addition of placeholder method get_default_message_store (rocketmq-store/src/ha/default_ha_service.rs) The method is currently unimplemented and not directly required for the DefaultHAConnection feature.

Poem

In burrows deep, connections grow,
With heartbeats sent and bytes that flow.
The master listens, the slave replies,
As data hops and time flies.
Rockets launch, high-availability’s in sight—
This rabbit’s code now feels just right! 🐇🚀

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label Jun 17, 2025
@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request implements DefaultHAConnection and enhances HA service functionality. Key changes include:

  • Adding an AsRef implementation for ServiceManager to expose its inner service.
  • Introducing new methods in FlowMonitor to delegate byte transfer operations via server_manager.
  • Adding a placeholder get_default_message_store function and updating HA-related configuration defaults.

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
rocketmq/src/task/service_task.rs Added AsRef implementation for ServiceManager
rocketmq-store/src/ha/flow_monitor.rs Added methods to retrieve and manipulate transfer byte metrics
rocketmq-store/src/ha/default_ha_service.rs Introduced placeholder for get_default_message_store
rocketmq-store/src/ha.rs Registered the new default_ha_connection module
rocketmq-store/src/config/message_store_config.rs Updated configuration defaults for HA intervals
Comments suppressed due to low confidence (1)

rocketmq-store/src/config/message_store_config.rs:275

  • [nitpick] Consider adding a comment clarifying that '1000 * 20' represents a time interval in milliseconds to improve code readability for future maintainers.
        1000 * 20

// Add any necessary fields here

pub fn get_default_message_store(&self) -> &LocalFileMessageStore {
unimplemented!()
Copy link
Preview

Copilot AI Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of unimplemented!() will cause a runtime panic if this method is ever called. Consider adding documentation to indicate its intended future implementation or provide a safe fallback.

Suggested change
unimplemented!()
panic!("get_default_message_store is not implemented. This method should return a reference to a LocalFileMessageStore. Please implement this method before using it.");

Copilot uses AI. Check for mistakes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (4)
rocketmq-store/src/config/message_store_config.rs (1)

274-279: Use u64-suffixed literals for clarity and to avoid implicit type coercion

The new helpers return u64, but the literal arithmetic (1000 * 20, 1000 * 5) is still inferred from unsuffixed i32 values and then widened.
A one-time coercion is legal, yet adding the suffix makes the intent obvious and prevents accidental narrowing if these constants are copied elsewhere.

-    pub fn ha_housekeeping_interval() -> u64 {
-        1000 * 20
-    }
-    pub fn ha_send_heartbeat_interval() -> u64 {
-        1000 * 5
-    }
+    pub fn ha_housekeeping_interval() -> u64 {
+        20_000u64 // 20 s
+    }
+    pub fn ha_send_heartbeat_interval() -> u64 {
+        5_000u64  // 5 s
+    }
rocketmq/src/task/service_task.rs (1)

159-163: Return the inner T directly to avoid an extra indirection

&self.service gives a &Arc<T>; the compiler currently performs a double‐deref to convert this to &T, but being explicit avoids confusion and one needless Deref hop.

-    fn as_ref(&self) -> &T {
-        &self.service
+    fn as_ref(&self) -> &T {
+        &*self.service
     }
rocketmq-store/src/ha/default_ha_service.rs (1)

1-32: Duplicate ASF licence header – please keep a single copy

Two identical licence blocks increase file length and risk contradicting future edits.
Remove the second one (lines 17-32) or keep just a short SPDX line.

rocketmq-store/src/ha/default_ha_connection.rs (1)

318-343: Offset parsing discards preceding frames

After reading ≥ 8 bytes you always parse the last 8 bytes and process_position = 0, silently dropping any earlier pending frames. If the slave batches multiple reports in one TCP packet only the final one is considered.

Refactor to consume headers in a loop while process_position >= REPORT_HEADER_SIZE, or adopt BytesMut/Buf for framed decoding.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bce8a82 and 2dca1a9.

📒 Files selected for processing (6)
  • rocketmq-store/src/config/message_store_config.rs (3 hunks)
  • rocketmq-store/src/ha.rs (1 hunks)
  • rocketmq-store/src/ha/default_ha_connection.rs (1 hunks)
  • rocketmq-store/src/ha/default_ha_service.rs (1 hunks)
  • rocketmq-store/src/ha/flow_monitor.rs (1 hunks)
  • rocketmq/src/task/service_task.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build
  • GitHub Check: test
  • GitHub Check: auto-approve
🔇 Additional comments (4)
rocketmq-store/src/config/message_store_config.rs (1)

516-521: Serde default functions added – verify all external configurators

The new ha_send_heartbeat_interval and ha_housekeeping_interval fields changed type from usize to u64.
If you have any TOML/YAML/JSON config files or CLI flags that still supply these values as "12345" (implicitly usize), double-check the downstream deserialisation to avoid a silent break at runtime.
No code change required – just a heads-up.

rocketmq-store/src/ha.rs (1)

18-18: Module inclusion looks good

mod default_ha_connection; is correctly added and kept private to the ha crate module – no issues spotted.

rocketmq-store/src/ha/flow_monitor.rs (1)

47-60: Public delegation methods – concise and thread-safe

The new accessors cleanly expose the inner counters and rely on atomics, preserving thread safety. Nice addition.

rocketmq-store/src/ha/default_ha_connection.rs (1)

217-253: Read service’s private channel is also unused

ReadSocketService::new creates its own shutdown_sender, but nothing ever sends or receives on this channel. Remove the field or wire it into a real shutdown signal; otherwise the extra allocation / field is wasted and confuses future maintainers.

Comment on lines +55 to +60
// Add any necessary fields here

pub fn get_default_message_store(&self) -> &LocalFileMessageStore {
unimplemented!()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

get_default_message_store cannot be implemented on an empty struct

The method promises &LocalFileMessageStore but DefaultHAService holds no such field, so a safe reference can never be returned.
Either:

  1. Store an Arc<LocalFileMessageStore> inside the struct and return &self.message_store, or
  2. Change the signature to return an Arc<LocalFileMessageStore> (or similar ownership).

Leaving it as unimplemented!() will panic at first call and masks the real design gap.

🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_service.rs around lines 55 to 60, the method
get_default_message_store returns a reference to LocalFileMessageStore but the
struct DefaultHAService has no such field, making it impossible to return a safe
reference. To fix this, add a field like Arc<LocalFileMessageStore> to
DefaultHAService and return a reference to it, or change the method signature to
return an Arc<LocalFileMessageStore> and return the owned value. Remove the
unimplemented!() placeholder to avoid runtime panics.

Comment on lines +305 to +312
let mut socket_guard = socket_stream.write().await;
if let Some(ref mut socket) = socket_guard.as_mut() {
match timeout(
Duration::from_secs(1),
socket.read(&mut buffer[process_position..]),
)
.await
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Heavy contention on a single RwLock around the socket

Both read- and write-tasks grab the write side of the same RwLock<Option<TcpStream>> for every I/O call. This serialises all reads and writes (no parallelism, unnecessary context-switching) and may dead-lock if either task stalls while holding the lock.

A simpler & faster pattern:

let (mut reader, mut writer) = socket.into_split();   // returns independent halves

Store each half behind its own Mutex (or none at all) so reads and writes progress concurrently without blocking each other.

Also applies to: 577-591

🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs around lines 305 to 312, the
code uses a single RwLock<Option<TcpStream>> for both reading and writing,
causing heavy contention and potential deadlocks. Refactor by splitting the
TcpStream into separate read and write halves using the into_split() method,
then store each half behind its own Mutex or no lock if safe, allowing
concurrent reads and writes without blocking each other. Apply the same pattern
to lines 577 to 591.

Comment on lines +81 to +85
// ha_service.increment_connection_count();

let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);

Ok(Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

shutdown_receiver is created but never used – remove or utilise it

let (shutdown_sender, shutdown_receiver) = … immediately discards shutdown_receiver, triggering an “unused variable” warning and signalling a design hole (the connection has no central shutdown listener).
Either:

-let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);
+let (shutdown_sender, _shutdown_receiver) = mpsc::channel(1);   // silence warning

or keep the receiver and drive coordinated shutdown through it.
Leaving dead code lowers maintainability and may break #![deny(warnings] builds.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// ha_service.increment_connection_count();
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);
Ok(Self {
// ha_service.increment_connection_count();
let (shutdown_sender, _shutdown_receiver) = mpsc::channel(1); // silence warning
Ok(Self {
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs around lines 81 to 85, the
variable shutdown_receiver is created but never used, causing an unused variable
warning. To fix this, either remove the shutdown_receiver if it is unnecessary,
or retain it and implement logic to listen for shutdown signals through it to
enable coordinated shutdown. Ensure the code uses shutdown_receiver properly or
eliminates it to avoid dead code and warnings.

Comment on lines +529 to +556
if let Some(data) = MessageStore::get_commit_log_data(
ha_service.get_default_message_store(),
next_transfer_from_where.load(Ordering::SeqCst),
) {
//let mut size = data.len();
let mut size = 0_i32;
let max_batch_size = message_store_config.ha_transfer_batch_size as i32;
if size > max_batch_size {
size = max_batch_size;
}

let can_transfer_max = flow_monitor.can_transfer_max_byte_num();
if size > can_transfer_max {
size = can_transfer_max;
}

let this_offset = next_transfer_from_where.load(Ordering::SeqCst);
next_transfer_from_where.store(this_offset + size as i64, Ordering::SeqCst);

let header = Self::build_header(this_offset, size);

// Transfer header and data
if Self::transfer_header(&socket_stream, &header, &flow_monitor).await {
/*last_write_over =
Self::transfer_data(&socket_stream, &data[..size as usize], &flow_monitor).await;*/
last_write_timestamp = Instant::now();
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Data never transferred – core lines commented out

size is forcibly set to 0, transfer_data is commented, so the write side only pushes heart-beats. Slaves will never receive commit-log bodies.

-// let mut size = data.len();
-let mut size = 0_i32;
+let mut size = data.len() as i32;
...
-/*last_write_over =
-    Self::transfer_data(&socket_stream, &data[..size as usize], &flow_monitor).await;*/
+last_write_over =
+    Self::transfer_data(&socket_stream, &data[..size as usize], &flow_monitor).await;

Without this fix HA replication is functionally broken.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some(data) = MessageStore::get_commit_log_data(
ha_service.get_default_message_store(),
next_transfer_from_where.load(Ordering::SeqCst),
) {
//let mut size = data.len();
let mut size = 0_i32;
let max_batch_size = message_store_config.ha_transfer_batch_size as i32;
if size > max_batch_size {
size = max_batch_size;
}
let can_transfer_max = flow_monitor.can_transfer_max_byte_num();
if size > can_transfer_max {
size = can_transfer_max;
}
let this_offset = next_transfer_from_where.load(Ordering::SeqCst);
next_transfer_from_where.store(this_offset + size as i64, Ordering::SeqCst);
let header = Self::build_header(this_offset, size);
// Transfer header and data
if Self::transfer_header(&socket_stream, &header, &flow_monitor).await {
/*last_write_over =
Self::transfer_data(&socket_stream, &data[..size as usize], &flow_monitor).await;*/
last_write_timestamp = Instant::now();
}
} else {
if let Some(data) = MessageStore::get_commit_log_data(
ha_service.get_default_message_store(),
next_transfer_from_where.load(Ordering::SeqCst),
) {
let mut size = data.len() as i32;
let max_batch_size = message_store_config.ha_transfer_batch_size as i32;
if size > max_batch_size {
size = max_batch_size;
}
let can_transfer_max = flow_monitor.can_transfer_max_byte_num();
if size > can_transfer_max {
size = can_transfer_max;
}
let this_offset = next_transfer_from_where.load(Ordering::SeqCst);
next_transfer_from_where.store(this_offset + size as i64, Ordering::SeqCst);
let header = Self::build_header(this_offset, size);
// Transfer header and data
if Self::transfer_header(&socket_stream, &header, &flow_monitor).await {
last_write_over =
Self::transfer_data(&socket_stream, &data[..size as usize], &flow_monitor).await;
last_write_timestamp = Instant::now();
}
} else {
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs between lines 529 and 556, the
variable 'size' is incorrectly set to 0, and the call to transfer_data is
commented out, preventing actual commit-log data from being transferred and
causing HA replication to fail. To fix this, set 'size' to the length of the
data retrieved from get_commit_log_data, ensure it is capped by max_batch_size
and can_transfer_max_byte_num as before, and uncomment and enable the call to
transfer_data to send the commit-log data after transferring the header.

Comment on lines +484 to +508
if next_transfer_from_where.load(Ordering::SeqCst) == -1 {
let slave_offset = slave_request_offset.load(Ordering::SeqCst);
let transfer_offset = if slave_offset == 0 {
/*let master_offset = ha_service
.get_message_store()::<MS>()
.get_commit_log()
.get_max_offset();*/
let master_offset = 0;
let mapped_file_size = message_store_config.mapped_file_size_commit_log;
let aligned_offset = master_offset - (master_offset % mapped_file_size as i64);
if aligned_offset < 0 {
0
} else {
aligned_offset
}
} else {
slave_offset
};

next_transfer_from_where.store(transfer_offset, Ordering::SeqCst);
info!(
"master transfer data from {} to slave[{}], and slave request {}",
transfer_offset, client_address, slave_offset
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Placeholder master_offset = 0 breaks initial offset calculation

Replication will always start at offset 0, ignoring the real commit-log tail. Replace the placeholder with the actual value:

- let master_offset = 0;
+ let master_offset = ha_service
+     .get_default_message_store()
+     .commit_log_max_offset();   // adjust to real API

Failing to do so may resend gigabytes of data unnecessarily or corrupt offsets on slaves.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if next_transfer_from_where.load(Ordering::SeqCst) == -1 {
let slave_offset = slave_request_offset.load(Ordering::SeqCst);
let transfer_offset = if slave_offset == 0 {
/*let master_offset = ha_service
.get_message_store()::<MS>()
.get_commit_log()
.get_max_offset();*/
let master_offset = 0;
let mapped_file_size = message_store_config.mapped_file_size_commit_log;
let aligned_offset = master_offset - (master_offset % mapped_file_size as i64);
if aligned_offset < 0 {
0
} else {
aligned_offset
}
} else {
slave_offset
};
next_transfer_from_where.store(transfer_offset, Ordering::SeqCst);
info!(
"master transfer data from {} to slave[{}], and slave request {}",
transfer_offset, client_address, slave_offset
);
}
if next_transfer_from_where.load(Ordering::SeqCst) == -1 {
let slave_offset = slave_request_offset.load(Ordering::SeqCst);
let transfer_offset = if slave_offset == 0 {
- let master_offset = 0;
+ // Fetch the real tail offset from the master’s commit log
+ let master_offset = ha_service
+ .get_default_message_store()
+ .commit_log_max_offset(); // adjust to real API
let mapped_file_size = message_store_config.mapped_file_size_commit_log;
let aligned_offset = master_offset - (master_offset % mapped_file_size as i64);
if aligned_offset < 0 {
0
} else {
aligned_offset
}
} else {
slave_offset
};
next_transfer_from_where.store(transfer_offset, Ordering::SeqCst);
info!(
"master transfer data from {} to slave[{}], and slave request {}",
transfer_offset, client_address, slave_offset
);
}
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs between lines 484 and 508, the
code currently sets master_offset to 0 as a placeholder, which causes
replication to always start at offset 0 and ignores the actual commit-log tail.
To fix this, replace the placeholder with a call to retrieve the real maximum
offset from the commit log via
ha_service.get_message_store().get_commit_log().get_max_offset(). This ensures
the initial offset calculation uses the correct commit-log tail, preventing
unnecessary data resend or offset corruption on slaves.

Copy link

codecov bot commented Jun 17, 2025

Codecov Report

Attention: Patch coverage is 0.49261% with 404 lines in your changes missing coverage. Please review.

Project coverage is 26.47%. Comparing base (bce8a82) to head (2dca1a9).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rocketmq-store/src/ha/default_ha_connection.rs 0.00% 380 Missing ⚠️
rocketmq-store/src/ha/flow_monitor.rs 0.00% 13 Missing ⚠️
rocketmq-store/src/config/message_store_config.rs 25.00% 6 Missing ⚠️
rocketmq/src/task/service_task.rs 0.00% 3 Missing ⚠️
rocketmq-store/src/ha/default_ha_service.rs 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3477      +/-   ##
==========================================
- Coverage   26.60%   26.47%   -0.14%     
==========================================
  Files         546      547       +1     
  Lines       77978    78382     +404     
==========================================
  Hits        20749    20749              
- Misses      57229    57633     +404     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rocketmq-rust-bot rocketmq-rust-bot merged commit 5a000d5 into main Jun 17, 2025
22 of 24 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Jun 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] Implement DefaultHAConnection
3 participants