diff --git a/rocketmq-store/src/ha/default_ha_service.rs b/rocketmq-store/src/ha/default_ha_service.rs index 0f0fb800..307515d0 100644 --- a/rocketmq-store/src/ha/default_ha_service.rs +++ b/rocketmq-store/src/ha/default_ha_service.rs @@ -294,8 +294,14 @@ impl AcceptSocketService { unimplemented!("Auto-switching is not implemented yet"); }else{ let default_conn = DefaultHAConnection::new(default_ha_service.clone(), stream,message_store_config.clone()).await.expect("Error creating HAConnection"); - let general_conn = GeneralHAConnection::new_with_default_ha_connection(default_conn); - default_ha_service.add_connection(general_conn).await; + let mut general_conn = GeneralHAConnection::new_with_default_ha_connection(default_conn); + if let Err(e) = general_conn.start().await { + error!("Error starting HAService: {}", e); + }else { + info!("HAService accept new connection, {}", addr); + default_ha_service.add_connection(general_conn).await; + } + }; } Err(e) => { diff --git a/rocketmq-store/src/ha/general_ha_connection.rs b/rocketmq-store/src/ha/general_ha_connection.rs index 196d0aca..6b3fb7e5 100644 --- a/rocketmq-store/src/ha/general_ha_connection.rs +++ b/rocketmq-store/src/ha/general_ha_connection.rs @@ -63,7 +63,15 @@ impl GeneralHAConnection { impl HAConnection for GeneralHAConnection { async fn start(&mut self) -> Result<(), HAConnectionError> { - todo!() + if let Some(ref mut connection) = self.default_ha_connection { + connection.start().await + } else if let Some(ref mut connection) = self.auto_switch_ha_connection { + connection.start().await + } else { + Err(HAConnectionError::Connection( + "No HA connection set".to_string(), + )) + } } async fn shutdown(&mut self) {