Skip to content

Commit ce880f4

Browse files
authored
[ISSUE #2046]💫Implement PopBufferMergeService#putCkToStore🧑‍💻 (#2049)
1 parent 574d67b commit ce880f4

File tree

3 files changed

+72
-19
lines changed

3 files changed

+72
-19
lines changed

rocketmq-broker/src/broker_runtime.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,7 @@ impl BrokerRuntime {
572572
self.consumer_filter_manager.clone(),
573573
self.pop_inflight_message_counter.clone(),
574574
self.store_host,
575+
self.escape_bridge.clone(),
575576
));
576577
let ack_message_processor = ArcMut::new(AckMessageProcessor::new(
577578
self.topic_config_manager.clone(),

rocketmq-broker/src/processor/pop_message_processor.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use tracing::warn;
7272

7373
use crate::broker_error::BrokerError;
7474
use crate::client::manager::consumer_manager::ConsumerManager;
75+
use crate::failover::escape_bridge::EscapeBridge;
7576
use crate::filter::expression_message_filter::ExpressionMessageFilter;
7677
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
7778
use crate::long_polling::long_polling_service::pop_long_polling_service::PopLongPollingService;
@@ -98,7 +99,7 @@ pub struct PopMessageProcessor<MS> {
9899
consumer_filter_manager: Arc<ConsumerFilterManager>,
99100
ck_message_number: AtomicI64,
100101
pop_long_polling_service: ArcMut<PopLongPollingService>,
101-
pop_buffer_merge_service: ArcMut<PopBufferMergeService>,
102+
pop_buffer_merge_service: ArcMut<PopBufferMergeService<MS>>,
102103
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
103104
queue_lock_manager: QueueLockManager,
104105
revive_topic: CheetahString,
@@ -118,6 +119,7 @@ impl<MS> PopMessageProcessor<MS> {
118119
consumer_filter_manager: Arc<ConsumerFilterManager>,
119120
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
120121
store_host: SocketAddr,
122+
escape_bridge: ArcMut<EscapeBridge<MS>>,
121123
) -> Self {
122124
let revive_topic = CheetahString::from_string(PopAckConstants::build_cluster_revive_topic(
123125
broker_config.broker_identity.broker_cluster_name.as_str(),
@@ -139,6 +141,8 @@ impl<MS> PopMessageProcessor<MS> {
139141
revive_topic.clone(),
140142
queue_lock_manager.clone(),
141143
broker_config,
144+
store_host,
145+
escape_bridge,
142146
)),
143147
pop_inflight_message_counter,
144148
queue_lock_manager,
@@ -1178,23 +1182,28 @@ impl<MS> PopMessageProcessor<MS> {
11781182
)
11791183
}
11801184

1181-
pub fn pop_buffer_merge_service(&self) -> &ArcMut<PopBufferMergeService> {
1185+
pub fn pop_buffer_merge_service(&self) -> &ArcMut<PopBufferMergeService<MS>> {
11821186
&self.pop_buffer_merge_service
11831187
}
11841188

1185-
pub fn pop_buffer_merge_service_mut(&mut self) -> &mut ArcMut<PopBufferMergeService> {
1189+
pub fn pop_buffer_merge_service_mut(&mut self) -> &mut ArcMut<PopBufferMergeService<MS>> {
11861190
&mut self.pop_buffer_merge_service
11871191
}
11881192

1189-
pub fn build_ck_msg(&self, ck: &PopCheckPoint, revive_qid: i32) -> MessageExtBrokerInner {
1193+
pub fn build_ck_msg(
1194+
store_host: SocketAddr,
1195+
ck: &PopCheckPoint,
1196+
revive_qid: i32,
1197+
revive_topic: CheetahString,
1198+
) -> MessageExtBrokerInner {
11901199
let mut msg = MessageExtBrokerInner::default();
1191-
msg.set_topic(self.revive_topic.clone());
1200+
msg.set_topic(revive_topic);
11921201
msg.set_body(Bytes::from(ck.to_json().unwrap()));
11931202
msg.message_ext_inner.queue_id = revive_qid;
11941203
msg.set_tags(CheetahString::from_static_str(PopAckConstants::CK_TAG));
11951204
msg.message_ext_inner.born_timestamp = get_current_millis() as i64;
1196-
msg.message_ext_inner.born_host = self.store_host;
1197-
msg.message_ext_inner.store_host = self.store_host;
1205+
msg.message_ext_inner.born_host = store_host;
1206+
msg.message_ext_inner.store_host = store_host;
11981207
msg.set_delay_time_ms((ck.get_revive_time() - PopAckConstants::ACK_TIME_INTERVAL) as u64);
11991208
msg.put_property(
12001209
CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#![allow(unused_variables)]
1818

1919
use std::collections::VecDeque;
20+
use std::net::SocketAddr;
2021
use std::sync::atomic::AtomicBool;
2122
use std::sync::atomic::AtomicI32;
2223
use std::sync::atomic::Ordering;
@@ -32,6 +33,8 @@ use rocketmq_common::common::pop_ack_constants::PopAckConstants;
3233
use rocketmq_common::utils::data_converter::DataConverter;
3334
use rocketmq_common::TimeUtils::get_current_millis;
3435
use rocketmq_rust::ArcMut;
36+
use rocketmq_store::base::message_status_enum::PutMessageStatus;
37+
use rocketmq_store::log_file::MessageStore;
3538
use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
3639
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
3740
use rocketmq_store::pop::AckMessage;
@@ -41,9 +44,11 @@ use tracing::error;
4144
use tracing::info;
4245
use tracing::warn;
4346

47+
use crate::failover::escape_bridge::EscapeBridge;
48+
use crate::processor::pop_message_processor::PopMessageProcessor;
4449
use crate::processor::pop_message_processor::QueueLockManager;
4550

46-
pub(crate) struct PopBufferMergeService {
51+
pub(crate) struct PopBufferMergeService<MS> {
4752
buffer: DashMap<CheetahString /* mergeKey */, PopCheckPointWrapper>,
4853
commit_offsets:
4954
DashMap<CheetahString /* topic@cid@queueId */, QueueWithTime<PopCheckPointWrapper>>,
@@ -61,13 +66,17 @@ pub(crate) struct PopBufferMergeService {
6166
master: AtomicBool,
6267
broker_config: Arc<BrokerConfig>,
6368
shutdown: Arc<Notify>,
69+
store_host: SocketAddr,
70+
escape_bridge: ArcMut<EscapeBridge<MS>>,
6471
}
6572

66-
impl PopBufferMergeService {
73+
impl<MS> PopBufferMergeService<MS> {
6774
pub fn new(
6875
revive_topic: CheetahString,
6976
queue_lock_manager: QueueLockManager,
7077
broker_config: Arc<BrokerConfig>,
78+
store_host: SocketAddr,
79+
escape_bridge: ArcMut<EscapeBridge<MS>>,
7180
) -> Self {
7281
let interval = 5;
7382
Self {
@@ -87,11 +96,13 @@ impl PopBufferMergeService {
8796
master: AtomicBool::new(false),
8897
broker_config,
8998
shutdown: Arc::new(Notify::new()),
99+
store_host,
100+
escape_bridge,
90101
}
91102
}
92103
}
93104

94-
impl PopBufferMergeService {
105+
impl<MS: MessageStore> PopBufferMergeService<MS> {
95106
pub fn add_ack(&mut self, revive_qid: i32, ack_msg: &dyn AckMessage) -> bool {
96107
if !self.broker_config.enable_pop_buffer_merge {
97108
return false;
@@ -237,7 +248,7 @@ impl PopBufferMergeService {
237248
self.master.load(Ordering::Acquire)
238249
}
239250

240-
fn scan(&mut self) {
251+
async fn scan(&mut self) {
241252
let start_time = Instant::now();
242253
let mut count = 0;
243254
let mut count_ck = 0;
@@ -275,13 +286,13 @@ impl PopBufferMergeService {
275286
//nothing to do
276287
} else if point_wrapper.is_just_offset() {
277288
if point_wrapper.get_revive_queue_offset() < 0 {
278-
self.put_ck_to_store(point_wrapper, false);
289+
self.put_ck_to_store(point_wrapper, false).await;
279290
count_ck += 1;
280291
}
281292
} else if remove_ck {
282293
if point_wrapper.get_revive_queue_offset() < 0 {
283294
{
284-
self.put_ck_to_store(point_wrapper, false);
295+
self.put_ck_to_store(point_wrapper, false).await;
285296
}
286297
count_ck += 1;
287298
}
@@ -312,7 +323,7 @@ impl PopBufferMergeService {
312323
self.batch_ack_index_list.clear();
313324
}
314325
} else if point_wrapper.get_revive_queue_offset() < 0 {
315-
self.put_ck_to_store(point_wrapper, false);
326+
self.put_ck_to_store(point_wrapper, false).await;
316327
count_ck += 1;
317328
}
318329
}
@@ -377,7 +388,7 @@ impl PopBufferMergeService {
377388
this.commit_offsets.clear();
378389
return;
379390
}
380-
this.mut_from_ref().scan();
391+
this.mut_from_ref().scan().await;
381392
if this.scan_times % this.count_of_second30 == 0 {
382393
this.scan_garbage();
383394
}
@@ -395,7 +406,7 @@ impl PopBufferMergeService {
395406
return;
396407
}
397408
while !this.buffer.is_empty() || this.get_offset_total_size() > 0 {
398-
this.mut_from_ref().scan();
409+
this.mut_from_ref().scan().await;
399410
}
400411
});
401412
}
@@ -410,9 +421,41 @@ impl PopBufferMergeService {
410421
unimplemented!()
411422
}
412423

413-
fn put_ck_to_store(&self, point_wrapper: &PopCheckPointWrapper, flag: bool) {
414-
// Implement the logic to put checkpoint to store
415-
unimplemented!()
424+
async fn put_ck_to_store(&self, point_wrapper: &PopCheckPointWrapper, flag: bool) {
425+
if point_wrapper.get_revive_queue_offset() >= 0 {
426+
return;
427+
}
428+
let msg_inner = PopMessageProcessor::<MS>::build_ck_msg(
429+
self.store_host,
430+
point_wrapper.get_ck(),
431+
point_wrapper.revive_queue_id,
432+
self.revive_topic.clone(),
433+
);
434+
let put_message_result = self
435+
.escape_bridge
436+
.mut_from_ref()
437+
.put_message_to_specific_queue(msg_inner)
438+
.await;
439+
match put_message_result.put_message_status() {
440+
PutMessageStatus::PutOk
441+
| PutMessageStatus::FlushDiskTimeout
442+
| PutMessageStatus::FlushSlaveTimeout
443+
| PutMessageStatus::SlaveNotAvailable => {
444+
return;
445+
}
446+
_ => {}
447+
}
448+
point_wrapper.set_ck_stored(true);
449+
if put_message_result.remote_put() {
450+
point_wrapper.set_revive_queue_offset(0);
451+
} else {
452+
point_wrapper.set_revive_queue_offset(
453+
put_message_result
454+
.append_message_result()
455+
.unwrap()
456+
.logics_offset,
457+
);
458+
}
416459
}
417460

418461
fn put_batch_ack_to_store(

0 commit comments

Comments
 (0)