Skip to content

Commit 1c62b0a

Browse files
authored
[ISSUE #3527]⚡️Delegation pattern implementation with error handling for HA connections (#3528)
1 parent 32816e2 commit 1c62b0a

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

rocketmq-store/src/ha/default_ha_service.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,14 @@ impl AcceptSocketService {
294294
unimplemented!("Auto-switching is not implemented yet");
295295
}else{
296296
let default_conn = DefaultHAConnection::new(default_ha_service.clone(), stream,message_store_config.clone()).await.expect("Error creating HAConnection");
297-
let general_conn = GeneralHAConnection::new_with_default_ha_connection(default_conn);
298-
default_ha_service.add_connection(general_conn).await;
297+
let mut general_conn = GeneralHAConnection::new_with_default_ha_connection(default_conn);
298+
if let Err(e) = general_conn.start().await {
299+
error!("Error starting HAService: {}", e);
300+
}else {
301+
info!("HAService accept new connection, {}", addr);
302+
default_ha_service.add_connection(general_conn).await;
303+
}
304+
299305
};
300306
}
301307
Err(e) => {

rocketmq-store/src/ha/general_ha_connection.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,15 @@ impl GeneralHAConnection {
6363

6464
impl HAConnection for GeneralHAConnection {
6565
async fn start(&mut self) -> Result<(), HAConnectionError> {
66-
todo!()
66+
if let Some(ref mut connection) = self.default_ha_connection {
67+
connection.start().await
68+
} else if let Some(ref mut connection) = self.auto_switch_ha_connection {
69+
connection.start().await
70+
} else {
71+
Err(HAConnectionError::Connection(
72+
"No HA connection set".to_string(),
73+
))
74+
}
6775
}
6876

6977
async fn shutdown(&mut self) {

0 commit comments

Comments
 (0)