Skip to content

[ISSUE #3527]⚡️Delegation pattern implementation with error handling for HA connections #3528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions rocketmq-store/src/ha/default_ha_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,14 @@
unimplemented!("Auto-switching is not implemented yet");
}else{
let default_conn = DefaultHAConnection::new(default_ha_service.clone(), stream,message_store_config.clone()).await.expect("Error creating HAConnection");
let general_conn = GeneralHAConnection::new_with_default_ha_connection(default_conn);
default_ha_service.add_connection(general_conn).await;
let mut general_conn = GeneralHAConnection::new_with_default_ha_connection(default_conn);
if let Err(e) = general_conn.start().await {
error!("Error starting HAService: {}", e);

Check warning on line 299 in rocketmq-store/src/ha/default_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_service.rs#L297-L299

Added lines #L297 - L299 were not covered by tests
Copy link
Preview

Copilot AI Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On startup failure, the connection isn't explicitly closed or cleaned up. Consider closing the underlying stream or reporting the error upstream to avoid resource leaks.

Suggested change
error!("Error starting HAService: {}", e);
error!("Error starting HAService: {}", e);
// Explicitly clean up resources to prevent leaks
general_conn.cleanup().await;

Copilot uses AI. Check for mistakes.

}else {
info!("HAService accept new connection, {}", addr);
default_ha_service.add_connection(general_conn).await;

Check warning on line 302 in rocketmq-store/src/ha/default_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_service.rs#L301-L302

Added lines #L301 - L302 were not covered by tests
}

};
}
Err(e) => {
Expand Down
10 changes: 9 additions & 1 deletion rocketmq-store/src/ha/general_ha_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,15 @@

impl HAConnection for GeneralHAConnection {
async fn start(&mut self) -> Result<(), HAConnectionError> {
todo!()
if let Some(ref mut connection) = self.default_ha_connection {
connection.start().await
} else if let Some(ref mut connection) = self.auto_switch_ha_connection {
connection.start().await

Check warning on line 69 in rocketmq-store/src/ha/general_ha_connection.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_connection.rs#L66-L69

Added lines #L66 - L69 were not covered by tests
} else {
Err(HAConnectionError::Connection(
"No HA connection set".to_string(),
))

Check warning on line 73 in rocketmq-store/src/ha/general_ha_connection.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_connection.rs#L71-L73

Added lines #L71 - L73 were not covered by tests
Comment on lines +66 to +73
Copy link
Preview

Copilot AI Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The nested if let blocks could be refactored into a single match on both Options (or by chaining with or) to reduce duplication and improve clarity.

Suggested change
if let Some(ref mut connection) = self.default_ha_connection {
connection.start().await
} else if let Some(ref mut connection) = self.auto_switch_ha_connection {
connection.start().await
} else {
Err(HAConnectionError::Connection(
"No HA connection set".to_string(),
))
match (&mut self.default_ha_connection, &mut self.auto_switch_ha_connection) {
(Some(connection), _) => connection.start().await,
(_, Some(connection)) => connection.start().await,
(None, None) => Err(HAConnectionError::Connection(
"No HA connection set".to_string(),
)),

Copilot uses AI. Check for mistakes.

}
}

async fn shutdown(&mut self) {
Expand Down
Loading