Skip to content

Commit 5f6a7f5

Browse files
authored
[ISSUE #699]🚀Support pull message result handle (#701)
1 parent f60b39c commit 5f6a7f5

17 files changed

+607
-18
lines changed

rocketmq-broker/src/broker_runtime.rs

+2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use crate::client::manager::producer_manager::ProducerManager;
5454
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
5555
use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook;
5656
use crate::hook::check_before_put_message::CheckBeforePutMessageHook;
57+
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager;
5758
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
5859
use crate::offset::manager::consumer_order_info_manager::ConsumerOrderInfoManager;
5960
use crate::out_api::broker_outer_api::BrokerOuterAPI;
@@ -372,6 +373,7 @@ impl BrokerRuntime {
372373
self.consumer_manager.clone(),
373374
self.consumer_filter_manager.clone(),
374375
Arc::new(self.consumer_offset_manager.clone()),
376+
Arc::new(BroadcastOffsetManager::default()),
375377
self.message_store.as_ref().unwrap().clone(),
376378
);
377379

rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs

+15
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,18 @@
1616
*/
1717
#[derive(Debug, Default)]
1818
pub struct BroadcastOffsetManager {}
19+
20+
#[allow(unused_variables)]
21+
impl BroadcastOffsetManager {
22+
pub fn query_init_offset(
23+
&self,
24+
topic: &str,
25+
group_id: &str,
26+
queue_id: i32,
27+
client_id: &str,
28+
request_offset: i64,
29+
from_proxy: bool,
30+
) -> i64 {
31+
unimplemented!()
32+
}
33+
}

rocketmq-broker/src/processor/default_pull_message_result_handler.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,23 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
9090
&mut response,
9191
client_address.as_str(),
9292
);
93-
/* self.execute_consume_message_hook_before(
93+
self.execute_consume_message_hook_before(
9494
&request,
9595
&request_header,
9696
&get_message_result,
9797
broker_allow_suspend,
9898
From::from(response.code()),
99-
);*/
100-
99+
);
100+
/*let response_header = response.read_custom_header_mut::<PullMessageResponseHeader>();
101+
let rewrite_result = PullMessageProcessor::rewrite_response_for_static_topic(
102+
&request_header,
103+
response_header.unwrap(),
104+
&mut mapping_context,
105+
ResponseCode::from(response.code()),
106+
);
107+
if rewrite_result.is_some() {
108+
return rewrite_result;
109+
}*/
101110
None
102111
}
103112
}

0 commit comments

Comments
 (0)