Skip to content

Commit 2a9a342

Browse files
authored
[ISSUE #1067] Supprot mq fault strategy (#1068)
1 parent 31dbf92 commit 2a9a342

File tree

9 files changed

+275
-100
lines changed

9 files changed

+275
-100
lines changed

rocketmq-client/src/implementation/mq_client_api_impl.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,9 @@ impl MQClientAPIImpl {
457457
}
458458
}
459459
let duration = (Instant::now() - begin_start_time).as_millis() as u64;
460-
producer.update_fault_item(broker_name, duration, false, true);
460+
producer
461+
.update_fault_item(broker_name, duration, false, true)
462+
.await;
461463
return;
462464
}
463465
let send_result = self.process_send_response(broker_name, msg, &response, addr);
@@ -470,11 +472,15 @@ impl MQClientAPIImpl {
470472
}
471473
let duration = (Instant::now() - begin_start_time).as_millis() as u64;
472474
send_callback.as_ref().unwrap()(Some(&result), None);
473-
producer.update_fault_item(broker_name, duration, false, true);
475+
producer
476+
.update_fault_item(broker_name, duration, false, true)
477+
.await;
474478
}
475479
Err(err) => {
476480
let duration = (Instant::now() - begin_start_time).as_millis() as u64;
477-
producer.update_fault_item(broker_name, duration, true, true);
481+
producer
482+
.update_fault_item(broker_name, duration, true, true)
483+
.await;
478484
Box::pin(self.on_exception_impl(
479485
broker_name,
480486
msg,

rocketmq-client/src/latency/latency_fault_tolerance.rs

+18-12
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use crate::latency::resolver::Resolver;
18-
use crate::latency::service_detector::ServiceDetector;
17+
use std::any::Any;
1918

20-
pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
19+
use rocketmq_common::ArcRefCellWrapper;
20+
21+
#[allow(async_fn_in_trait)]
22+
pub trait LatencyFaultTolerance<T, R, S>: Send + Sync + 'static {
2123
/// Update brokers' states, to decide if they are good or not.
2224
///
2325
/// # Arguments
@@ -27,7 +29,7 @@ pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
2729
/// * `not_available_duration` - Corresponding not available time, ms. The broker will be not
2830
/// available until it
2931
/// * `reachable` - To decide if this broker is reachable or not.
30-
fn update_fault_item(
32+
async fn update_fault_item(
3133
&mut self,
3234
name: T,
3335
current_latency: u64,
@@ -44,7 +46,7 @@ pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
4446
/// # Returns
4547
///
4648
/// * `true` if the broker is available, `false` otherwise.
47-
fn is_available(&self, name: &T) -> bool;
49+
async fn is_available(&self, name: &T) -> bool;
4850

4951
/// To check if this broker is reachable.
5052
///
@@ -55,30 +57,30 @@ pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
5557
/// # Returns
5658
///
5759
/// * `true` if the broker is reachable, `false` otherwise.
58-
fn is_reachable(&self, name: &T) -> bool;
60+
async fn is_reachable(&self, name: &T) -> bool;
5961

6062
/// Remove the broker in this fault item table.
6163
///
6264
/// # Arguments
6365
///
6466
/// * `name` - Broker's name.
65-
fn remove(&mut self, name: &T);
67+
async fn remove(&mut self, name: &T);
6668

6769
/// The worst situation, no broker can be available. Then choose a random one.
6870
///
6971
/// # Returns
7072
///
7173
/// * A random broker will be returned.
72-
fn pick_one_at_least(&self) -> T;
74+
async fn pick_one_at_least(&self) -> Option<T>;
7375

7476
/// Start a new thread, to detect the broker's reachable tag.
75-
fn start_detector(&self);
77+
fn start_detector(this: ArcRefCellWrapper<Self>);
7678

7779
/// Shutdown threads that started by `LatencyFaultTolerance`.
7880
fn shutdown(&self);
7981

8082
/// A function reserved, just detect by once, won't create a new thread.
81-
fn detect_by_one_round(&self);
83+
async fn detect_by_one_round(&self);
8284

8385
/// Use it to set the detect timeout bound.
8486
///
@@ -109,7 +111,11 @@ pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
109111
/// * `true` if the detector should be started, `false` otherwise.
110112
fn is_start_detector_enable(&self) -> bool;
111113

112-
fn set_resolver(&mut self, resolver: Box<dyn Resolver>);
114+
fn set_resolver(&mut self, resolver: R);
115+
116+
fn set_service_detector(&mut self, service_detector: S);
117+
118+
fn as_any(&self) -> &dyn Any;
113119

114-
fn set_service_detector(&mut self, service_detector: Box<dyn ServiceDetector>);
120+
fn as_any_mut(&mut self) -> &mut dyn Any;
115121
}

rocketmq-client/src/latency/latency_fault_tolerance_impl.rs

+122-28
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,25 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use std::any::Any;
1718
use std::collections::HashMap;
19+
use std::collections::HashSet;
1820

1921
use crate::latency::latency_fault_tolerance::LatencyFaultTolerance;
2022
use crate::latency::resolver::Resolver;
2123
use crate::latency::service_detector::ServiceDetector;
2224

23-
pub struct LatencyFaultToleranceImpl {
24-
fault_item_table: parking_lot::Mutex<HashMap<String, FaultItem>>,
25-
detect_timeout: i32,
26-
detect_interval: i32,
25+
pub struct LatencyFaultToleranceImpl<R, S> {
26+
fault_item_table: tokio::sync::Mutex<HashMap<String, FaultItem>>,
27+
detect_timeout: u32,
28+
detect_interval: u32,
2729
which_item_worst: ThreadLocalIndex,
2830
start_detector_enable: AtomicBool,
29-
resolver: Option<Box<dyn Resolver>>,
30-
service_detector: Option<Box<dyn ServiceDetector>>,
31+
resolver: Option<R>,
32+
service_detector: Option<S>,
3133
}
3234

33-
impl LatencyFaultToleranceImpl {
35+
impl<R, S> LatencyFaultToleranceImpl<R, S> {
3436
pub fn new(/*fetcher: impl Resolver, service_detector: impl ServiceDetector*/) -> Self {
3537
Self {
3638
resolver: None,
@@ -44,15 +46,19 @@ impl LatencyFaultToleranceImpl {
4446
}
4547
}
4648

47-
impl LatencyFaultTolerance<String> for LatencyFaultToleranceImpl {
48-
fn update_fault_item(
49+
impl<R, S> LatencyFaultTolerance<String, R, S> for LatencyFaultToleranceImpl<R, S>
50+
where
51+
R: Resolver,
52+
S: ServiceDetector,
53+
{
54+
async fn update_fault_item(
4955
&mut self,
5056
name: String,
5157
current_latency: u64,
5258
not_available_duration: u64,
5359
reachable: bool,
5460
) {
55-
let mut table = self.fault_item_table.lock();
61+
let mut table = self.fault_item_table.lock().await;
5662
let fault_item = table
5763
.entry(name.clone())
5864
.or_insert_with(|| FaultItem::new(name.clone()));
@@ -69,42 +75,120 @@ impl LatencyFaultTolerance<String> for LatencyFaultToleranceImpl {
6975
}
7076
}
7177

72-
fn is_available(&self, name: &String) -> bool {
73-
let fault_item_table = self.fault_item_table.lock();
78+
async fn is_available(&self, name: &String) -> bool {
79+
let fault_item_table = self.fault_item_table.lock().await;
7480
if let Some(fault_item) = fault_item_table.get(name) {
7581
return fault_item.is_available();
7682
}
7783
true
7884
}
7985

80-
fn is_reachable(&self, name: &String) -> bool {
81-
todo!()
86+
async fn is_reachable(&self, name: &String) -> bool {
87+
let fault_item_table = self.fault_item_table.lock().await;
88+
if let Some(fault_item) = fault_item_table.get(name) {
89+
return fault_item.is_reachable();
90+
}
91+
true
8292
}
8393

84-
fn remove(&mut self, name: &String) {
85-
todo!()
94+
async fn remove(&mut self, name: &String) {
95+
self.fault_item_table.lock().await.remove(name);
8696
}
8797

88-
fn pick_one_at_least(&self) -> String {
89-
todo!()
98+
async fn pick_one_at_least(&self) -> Option<String> {
99+
let fault_item_table = self.fault_item_table.lock().await;
100+
let mut tmp_list: Vec<_> = fault_item_table.values().collect();
101+
102+
if !tmp_list.is_empty() {
103+
use rand::seq::SliceRandom;
104+
let mut rng = rand::thread_rng();
105+
tmp_list.shuffle(&mut rng);
106+
for fault_item in tmp_list {
107+
if fault_item
108+
.reachable_flag
109+
.load(std::sync::atomic::Ordering::Acquire)
110+
{
111+
return Some(fault_item.name.clone());
112+
}
113+
}
114+
}
115+
None
90116
}
91117

92-
fn start_detector(&self) {
93-
todo!()
118+
fn start_detector(this: ArcRefCellWrapper<Self>) {
119+
tokio::spawn(async move {
120+
loop {
121+
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
122+
if !this
123+
.start_detector_enable
124+
.load(std::sync::atomic::Ordering::Relaxed)
125+
{
126+
continue;
127+
}
128+
129+
this.detect_by_one_round().await;
130+
}
131+
});
94132
}
95133

96134
fn shutdown(&self) {}
97135

98-
fn detect_by_one_round(&self) {
99-
todo!()
136+
async fn detect_by_one_round(&self) {
137+
let mut fault_item_table = self.fault_item_table.lock().await;
138+
let mut remove_set = HashSet::new();
139+
for (name, fault_item) in fault_item_table.iter() {
140+
if get_current_millis() as i64
141+
- (fault_item
142+
.check_stamp
143+
.load(std::sync::atomic::Ordering::Relaxed) as i64)
144+
< 0
145+
{
146+
continue;
147+
}
148+
fault_item.check_stamp.store(
149+
get_current_millis() + self.detect_interval as u64,
150+
std::sync::atomic::Ordering::Release,
151+
);
152+
let broker_addr = self
153+
.resolver
154+
.as_ref()
155+
.unwrap()
156+
.resolve(fault_item.name.as_str())
157+
.await;
158+
if broker_addr.is_none() {
159+
remove_set.insert(name.clone());
160+
continue;
161+
}
162+
if self.service_detector.is_none() {
163+
continue;
164+
}
165+
let service_ok = self
166+
.service_detector
167+
.as_ref()
168+
.unwrap()
169+
.detect(broker_addr.unwrap().as_str(), self.detect_timeout as u64);
170+
if service_ok
171+
&& fault_item
172+
.reachable_flag
173+
.load(std::sync::atomic::Ordering::Acquire)
174+
{
175+
info!("{} is reachable now, then it can be used.", name);
176+
fault_item
177+
.reachable_flag
178+
.store(true, std::sync::atomic::Ordering::Release);
179+
}
180+
}
181+
for name in remove_set {
182+
fault_item_table.remove(&name);
183+
}
100184
}
101185

102186
fn set_detect_timeout(&mut self, detect_timeout: u32) {
103-
todo!()
187+
self.detect_timeout = detect_timeout;
104188
}
105189

106190
fn set_detect_interval(&mut self, detect_interval: u32) {
107-
todo!()
191+
self.detect_interval = detect_interval;
108192
}
109193

110194
fn set_start_detector_enable(&mut self, start_detector_enable: bool) {
@@ -113,22 +197,32 @@ impl LatencyFaultTolerance<String> for LatencyFaultToleranceImpl {
113197
}
114198

115199
fn is_start_detector_enable(&self) -> bool {
116-
todo!()
200+
self.start_detector_enable
201+
.load(std::sync::atomic::Ordering::Acquire)
117202
}
118203

119-
fn set_resolver(&mut self, resolver: Box<dyn Resolver>) {
204+
fn set_resolver(&mut self, resolver: R) {
120205
self.resolver = Some(resolver);
121206
}
122207

123-
fn set_service_detector(&mut self, service_detector: Box<dyn ServiceDetector>) {
208+
fn set_service_detector(&mut self, service_detector: S) {
124209
self.service_detector = Some(service_detector);
125210
}
211+
212+
fn as_any(&self) -> &dyn Any {
213+
self
214+
}
215+
216+
fn as_any_mut(&mut self) -> &mut dyn Any {
217+
self
218+
}
126219
}
127220

128221
use std::cmp::Ordering;
129222
use std::hash::Hash;
130223
use std::sync::atomic::AtomicBool;
131224

225+
use rocketmq_common::ArcRefCellWrapper;
132226
use rocketmq_common::TimeUtils::get_current_millis;
133227
use tracing::info;
134228

@@ -166,7 +260,7 @@ impl FaultItem {
166260
now + not_available_duration,
167261
std::sync::atomic::Ordering::Relaxed,
168262
);
169-
println!(
263+
info!(
170264
"{} will be isolated for {} ms.",
171265
self.name, not_available_duration
172266
);

0 commit comments

Comments
 (0)