Skip to content

Commit 32816e2

Browse files
authored
[ISSUE #3525]⚡️Trait-based architecture for HA connections with unified error handling (#3526)
1 parent 2077620 commit 32816e2

File tree

5 files changed

+153
-45
lines changed

5 files changed

+153
-45
lines changed

rocketmq-store/src/ha.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,14 @@ pub(crate) mod ha_connection_state_notification_request;
3131
mod ha_connection_state_notification_service;
3232
pub(crate) mod ha_service;
3333
pub(crate) mod wait_notify_object;
34+
35+
/// Error types
36+
#[derive(Debug, thiserror::Error)]
37+
pub enum HAConnectionError {
38+
#[error("IO error: {0}")]
39+
Io(#[from] std::io::Error),
40+
#[error("Connection error: {0}")]
41+
Connection(String),
42+
#[error("Service error: {0}")]
43+
Service(String),
44+
}

rocketmq-store/src/ha/auto_switch/auto_switch_ha_connection.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,49 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
18+
use tokio::net::TcpStream;
19+
20+
use crate::ha::ha_connection::HAConnection;
21+
use crate::ha::ha_connection_state::HAConnectionState;
22+
use crate::ha::HAConnectionError;
23+
1724
pub struct AutoSwitchHAConnection;
25+
26+
impl HAConnection for AutoSwitchHAConnection {
27+
async fn start(&mut self) -> Result<(), HAConnectionError> {
28+
todo!()
29+
}
30+
31+
async fn shutdown(&mut self) {
32+
todo!()
33+
}
34+
35+
fn close(&self) {
36+
todo!()
37+
}
38+
39+
fn get_socket(&self) -> &TcpStream {
40+
todo!()
41+
}
42+
43+
async fn get_current_state(&self) -> HAConnectionState {
44+
todo!()
45+
}
46+
47+
fn get_client_address(&self) -> &str {
48+
todo!()
49+
}
50+
51+
fn get_transferred_byte_in_second(&self) -> i64 {
52+
todo!()
53+
}
54+
55+
fn get_transfer_from_where(&self) -> i64 {
56+
todo!()
57+
}
58+
59+
fn get_slave_ack_offset(&self) -> i64 {
60+
todo!()
61+
}
62+
}

rocketmq-store/src/ha/default_ha_connection.rs

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ use crate::base::message_store::MessageStore;
4343
use crate::config::message_store_config::MessageStoreConfig;
4444
use crate::ha::default_ha_service::DefaultHAService;
4545
use crate::ha::flow_monitor::FlowMonitor;
46+
use crate::ha::ha_connection::HAConnection;
4647
use crate::ha::ha_connection_state::HAConnectionState;
48+
use crate::ha::HAConnectionError;
4749

4850
/// Transfer Header buffer size. Schema: physic offset and body size.
4951
/// Format: [physicOffset (8bytes)][bodySize (4bytes)]
@@ -106,8 +108,40 @@ impl DefaultHAConnection {
106108
})
107109
}
108110

109-
/// Start the HA connection services
110-
pub async fn start(&mut self) -> Result<(), HAConnectionError> {
111+
/// Change the current state
112+
pub async fn change_current_state(&self, new_state: HAConnectionState) {
113+
info!("change state to {:?}", new_state);
114+
let mut state_guard = self.current_state.write().await;
115+
*state_guard = new_state;
116+
}
117+
118+
/// Get current state
119+
pub async fn get_current_state(&self) -> HAConnectionState {
120+
*self.current_state.read().await
121+
}
122+
123+
/// Get slave ack offset
124+
pub fn get_slave_ack_offset(&self) -> i64 {
125+
self.slave_ack_offset.load(Ordering::SeqCst)
126+
}
127+
128+
/// Get transferred bytes per second
129+
pub fn get_transferred_byte_in_second(&self) -> u64 {
130+
self.flow_monitor.get_transferred_byte_in_second() as u64
131+
}
132+
133+
/// Get transfer from where
134+
pub fn get_transfer_from_where(&self) -> i64 {
135+
if let Some(ref write_service) = self.write_socket_service {
136+
write_service.get_next_transfer_from_where()
137+
} else {
138+
-1
139+
}
140+
}
141+
}
142+
143+
impl HAConnection for DefaultHAConnection {
144+
async fn start(&mut self) -> Result<(), HAConnectionError> {
111145
self.change_current_state(HAConnectionState::Transfer).await;
112146

113147
// Start flow monitor
@@ -155,8 +189,7 @@ impl DefaultHAConnection {
155189
Ok(())
156190
}
157191

158-
/// Shutdown the HA connection
159-
pub async fn shutdown(&mut self) {
192+
async fn shutdown(&mut self) {
160193
self.change_current_state(HAConnectionState::Shutdown).await;
161194

162195
// Shutdown services
@@ -171,14 +204,13 @@ impl DefaultHAConnection {
171204
self.flow_monitor.shutdown().await;
172205

173206
// Close socket
174-
self.close().await;
207+
self.close();
175208

176209
// Decrement connection count
177210
//self.ha_service.decrement_connection_count();
178211
}
179212

180-
/// Close the socket connection
181-
pub async fn close(&self) {
213+
fn close(&self) {
182214
/*let mut socket_guard = self.socket_stream.write().await;
183215
if let Some(mut socket) = socket_guard.take() {
184216
if let Err(e) = socket.shutdown().await {
@@ -187,40 +219,28 @@ impl DefaultHAConnection {
187219
}*/
188220
}
189221

190-
/// Change the current state
191-
pub async fn change_current_state(&self, new_state: HAConnectionState) {
192-
info!("change state to {:?}", new_state);
193-
let mut state_guard = self.current_state.write().await;
194-
*state_guard = new_state;
222+
fn get_socket(&self) -> &TcpStream {
223+
todo!()
195224
}
196225

197-
/// Get current state
198-
pub async fn get_current_state(&self) -> HAConnectionState {
226+
async fn get_current_state(&self) -> HAConnectionState {
199227
*self.current_state.read().await
200228
}
201229

202-
/// Get client address
203-
pub fn get_client_address(&self) -> &str {
230+
fn get_client_address(&self) -> &str {
204231
&self.client_address
205232
}
206233

207-
/// Get slave ack offset
208-
pub fn get_slave_ack_offset(&self) -> i64 {
209-
self.slave_ack_offset.load(Ordering::SeqCst)
234+
fn get_transferred_byte_in_second(&self) -> i64 {
235+
todo!()
210236
}
211237

212-
/// Get transferred bytes per second
213-
pub fn get_transferred_byte_in_second(&self) -> u64 {
214-
self.flow_monitor.get_transferred_byte_in_second() as u64
238+
fn get_transfer_from_where(&self) -> i64 {
239+
todo!()
215240
}
216241

217-
/// Get transfer from where
218-
pub fn get_transfer_from_where(&self) -> i64 {
219-
if let Some(ref write_service) = self.write_socket_service {
220-
write_service.get_next_transfer_from_where()
221-
} else {
222-
-1
223-
}
242+
fn get_slave_ack_offset(&self) -> i64 {
243+
todo!()
224244
}
225245
}
226246

@@ -634,14 +654,3 @@ impl WriteSocketService {
634654
}
635655
}
636656
}
637-
638-
/// Error types
639-
#[derive(Debug, thiserror::Error)]
640-
pub enum HAConnectionError {
641-
#[error("IO error: {0}")]
642-
Io(#[from] std::io::Error),
643-
#[error("Connection error: {0}")]
644-
Connection(String),
645-
#[error("Service error: {0}")]
646-
Service(String),
647-
}

rocketmq-store/src/ha/general_ha_connection.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@
1515
* limitations under the License.
1616
*/
1717

18+
use tokio::net::TcpStream;
19+
1820
use crate::ha::auto_switch::auto_switch_ha_connection::AutoSwitchHAConnection;
1921
use crate::ha::default_ha_connection::DefaultHAConnection;
22+
use crate::ha::ha_connection::HAConnection;
23+
use crate::ha::ha_connection_state::HAConnectionState;
24+
use crate::ha::HAConnectionError;
2025

2126
pub struct GeneralHAConnection {
2227
default_ha_connection: Option<DefaultHAConnection>,
@@ -55,3 +60,41 @@ impl GeneralHAConnection {
5560
self.auto_switch_ha_connection = Some(connection);
5661
}
5762
}
63+
64+
impl HAConnection for GeneralHAConnection {
65+
async fn start(&mut self) -> Result<(), HAConnectionError> {
66+
todo!()
67+
}
68+
69+
async fn shutdown(&mut self) {
70+
todo!()
71+
}
72+
73+
fn close(&self) {
74+
todo!()
75+
}
76+
77+
fn get_socket(&self) -> &TcpStream {
78+
todo!()
79+
}
80+
81+
async fn get_current_state(&self) -> HAConnectionState {
82+
todo!()
83+
}
84+
85+
fn get_client_address(&self) -> &str {
86+
todo!()
87+
}
88+
89+
fn get_transferred_byte_in_second(&self) -> i64 {
90+
todo!()
91+
}
92+
93+
fn get_transfer_from_where(&self) -> i64 {
94+
todo!()
95+
}
96+
97+
fn get_slave_ack_offset(&self) -> i64 {
98+
todo!()
99+
}
100+
}

rocketmq-store/src/ha/ha_connection.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,23 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use std::net::SocketAddr;
1817

1918
use tokio::net::TcpStream;
2019

2120
use crate::ha::ha_connection_state::HAConnectionState;
21+
use crate::ha::HAConnectionError;
2222

2323
#[trait_variant::make(HAConnection: Send)]
2424
pub trait RocketmqHAConnection: Sync {
2525
/// Start the HA connection
2626
///
2727
/// This initiates the connection threads and begins processing.
28-
async fn start(&self);
28+
async fn start(&mut self) -> Result<(), HAConnectionError>;
2929

3030
/// Shutdown the HA connection gracefully
3131
///
3232
/// This initiates a clean shutdown of the connection.
33-
async fn shutdown(&self);
33+
async fn shutdown(&mut self);
3434

3535
/// Close the HA connection immediately
3636
///
@@ -47,13 +47,13 @@ pub trait RocketmqHAConnection: Sync {
4747
///
4848
/// # Returns
4949
/// Current connection state
50-
fn get_current_state(&self) -> HAConnectionState;
50+
async fn get_current_state(&self) -> HAConnectionState;
5151

5252
/// Get the client address for this connection
5353
///
5454
/// # Returns
5555
/// Socket address of the connected client
56-
fn get_client_address(&self) -> SocketAddr;
56+
fn get_client_address(&self) -> &str;
5757

5858
/// Get the data transfer rate per second
5959
///

0 commit comments

Comments
 (0)