Skip to content

Commit 01cd6f7

Browse files
authored
[ISSUE mxsm#702]🚀Support pull message result handle-2🚀 (mxsm#706)
1 parent 24c6f65 commit 01cd6f7

File tree

10 files changed

+369
-136
lines changed

10 files changed

+369
-136
lines changed

rocketmq-broker/src/broker_runtime.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ pub(crate) struct BrokerRuntime {
9494
broker_runtime: Option<RocketMQRuntime>,
9595
producer_manager: Arc<ProducerManager>,
9696
consumer_manager: Arc<ConsumerManager>,
97+
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
9798
drop: Arc<AtomicBool>,
9899
shutdown: Arc<AtomicBool>,
99100
shutdown_hook: Option<BrokerShutdownHook>,
@@ -124,6 +125,7 @@ impl Clone for BrokerRuntime {
124125
broker_runtime: None,
125126
producer_manager: self.producer_manager.clone(),
126127
consumer_manager: self.consumer_manager.clone(),
128+
broadcast_offset_manager: self.broadcast_offset_manager.clone(),
127129
drop: self.drop.clone(),
128130
shutdown: self.shutdown.clone(),
129131
shutdown_hook: self.shutdown_hook.clone(),
@@ -196,6 +198,7 @@ impl BrokerRuntime {
196198
broker_runtime: Some(runtime),
197199
producer_manager,
198200
consumer_manager,
201+
broadcast_offset_manager: Arc::new(Default::default()),
199202
drop: Arc::new(AtomicBool::new(false)),
200203
shutdown: Arc::new(AtomicBool::new(false)),
201204
shutdown_hook: None,
@@ -361,6 +364,10 @@ impl BrokerRuntime {
361364
);
362365
let pull_message_result_handler = DefaultPullMessageResultHandler::new(
363366
Arc::new(self.topic_config_manager.clone()),
367+
Arc::new(self.consumer_offset_manager.clone()),
368+
self.consumer_manager.clone(),
369+
self.broadcast_offset_manager.clone(),
370+
self.broker_stats_manager.clone(),
364371
self.broker_config.clone(),
365372
Arc::new(Default::default()),
366373
);

rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,16 @@ impl BroadcastOffsetManager {
3030
) -> i64 {
3131
unimplemented!()
3232
}
33+
34+
pub fn update_offset(
35+
&self,
36+
topic: &str,
37+
group: &str,
38+
queue_id: i32,
39+
offset: i64,
40+
client_id: &str,
41+
from_proxy: bool,
42+
) {
43+
unimplemented!()
44+
}
3345
}

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,23 @@ impl ConfigManager for ConsumerOffsetManager {
180180

181181
#[allow(unused_variables)]
182182
impl ConsumerOffsetManager {
183+
pub fn commit_pull_offset(
184+
&self,
185+
_client_host: SocketAddr,
186+
group: &str,
187+
topic: &str,
188+
queue_id: i32,
189+
offset: i64,
190+
) {
191+
let key = format!("{}{}{}", topic, TOPIC_GROUP_SEPARATOR, group);
192+
self.consumer_offset_wrapper
193+
.lock()
194+
.pull_offset_table
195+
.entry(key)
196+
.or_default()
197+
.insert(queue_id, offset);
198+
}
199+
183200
pub fn query_then_erase_reset_offset(
184201
&self,
185202
topic: &str,

rocketmq-broker/src/processor/default_pull_message_result_handler.rs

Lines changed: 178 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,22 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use std::net::SocketAddr;
1718
use std::sync::Arc;
1819

20+
use bytes::Bytes;
21+
use bytes::BytesMut;
1922
use rocketmq_common::common::broker::broker_config::BrokerConfig;
2023
use rocketmq_common::common::mix_all::MASTER_ID;
24+
use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
2125
use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
2226
use rocketmq_remoting::code::response_code::ResponseCode;
2327
use rocketmq_remoting::net::channel::Channel;
2428
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
2529
use rocketmq_remoting::protocol::header::pull_message_response_header::PullMessageResponseHeader;
2630
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
2731
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
32+
use rocketmq_remoting::protocol::request_source::RequestSource;
2833
use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_context::TopicQueueMappingContext;
2934
use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
3035
use rocketmq_remoting::protocol::NamespaceUtil;
@@ -36,25 +41,42 @@ use rocketmq_store::stats::stats_type::StatsType;
3641
use tracing::debug;
3742
use tracing::info;
3843

44+
use crate::client::manager::consumer_manager::ConsumerManager;
3945
use crate::mqtrace::consume_message_context::ConsumeMessageContext;
4046
use crate::mqtrace::consume_message_hook::ConsumeMessageHook;
47+
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager;
48+
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
49+
use crate::processor::pull_message_processor::is_broadcast;
50+
use crate::processor::pull_message_processor::rewrite_response_for_static_topic;
4151
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
4252
use crate::topic::manager::topic_config_manager::TopicConfigManager;
4353

4454
pub struct DefaultPullMessageResultHandler {
4555
topic_config_manager: Arc<TopicConfigManager>,
56+
consumer_offset_manager: Arc<ConsumerOffsetManager>,
57+
consumer_manager: Arc<ConsumerManager>,
58+
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
59+
broker_stats_manager: Arc<BrokerStatsManager>,
4660
broker_config: Arc<BrokerConfig>,
4761
consume_message_hook_list: Arc<Vec<Box<dyn ConsumeMessageHook>>>,
4862
}
4963

5064
impl DefaultPullMessageResultHandler {
5165
pub fn new(
5266
topic_config_manager: Arc<TopicConfigManager>,
67+
consumer_offset_manager: Arc<ConsumerOffsetManager>,
68+
consumer_manager: Arc<ConsumerManager>,
69+
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
70+
broker_stats_manager: Arc<BrokerStatsManager>,
5371
broker_config: Arc<BrokerConfig>,
5472
consume_message_hook_list: Arc<Vec<Box<dyn ConsumeMessageHook>>>,
5573
) -> Self {
5674
Self {
5775
topic_config_manager,
76+
consumer_offset_manager,
77+
consumer_manager,
78+
broadcast_offset_manager,
79+
broker_stats_manager,
5880
broker_config,
5981
consume_message_hook_list,
6082
}
@@ -74,7 +96,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
7496
broker_allow_suspend: bool,
7597
message_filter: Box<dyn MessageFilter>,
7698
mut response: RemotingCommand,
77-
mapping_context: TopicQueueMappingContext,
99+
mut mapping_context: TopicQueueMappingContext,
78100
begin_time_mills: u64,
79101
) -> Option<RemotingCommand> {
80102
let client_address = channel.remote_address().to_string();
@@ -90,28 +112,95 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
90112
&mut response,
91113
client_address.as_str(),
92114
);
115+
let code = From::from(response.code());
93116
self.execute_consume_message_hook_before(
94117
&request,
95118
&request_header,
96119
&get_message_result,
97120
broker_allow_suspend,
98-
From::from(response.code()),
121+
code,
99122
);
100-
/*let response_header = response.read_custom_header_mut::<PullMessageResponseHeader>();
101-
let rewrite_result = PullMessageProcessor::rewrite_response_for_static_topic(
123+
let response_header = response.read_custom_header_mut::<PullMessageResponseHeader>();
124+
let rewrite_result = rewrite_response_for_static_topic(
102125
&request_header,
103126
response_header.unwrap(),
104127
&mut mapping_context,
105-
ResponseCode::from(response.code()),
128+
code,
106129
);
107130
if rewrite_result.is_some() {
108131
return rewrite_result;
109-
}*/
110-
None
132+
}
133+
self.update_broadcast_pulled_offset(
134+
request_header.topic.as_str(),
135+
request_header.consumer_group.as_str(),
136+
request_header.queue_id.unwrap(),
137+
&request_header,
138+
&channel,
139+
Some(&response),
140+
get_message_result.next_begin_offset(),
141+
);
142+
self.try_commit_offset(
143+
broker_allow_suspend,
144+
&request_header,
145+
get_message_result.next_begin_offset(),
146+
channel.remote_address(),
147+
);
148+
149+
match code {
150+
ResponseCode::Success => {
151+
self.broker_stats_manager.inc_group_get_nums(
152+
request_header.consumer_group.as_str(),
153+
request_header.topic.as_str(),
154+
get_message_result.message_count(),
155+
);
156+
self.broker_stats_manager.inc_group_get_size(
157+
request_header.consumer_group.as_str(),
158+
request_header.topic.as_str(),
159+
get_message_result.buffer_total_size(),
160+
);
161+
self.broker_stats_manager.inc_broker_get_nums(
162+
request_header.topic.as_str(),
163+
get_message_result.message_count(),
164+
);
165+
if self.broker_config.transfer_msg_by_heap {
166+
let body = self.read_get_message_result(
167+
&get_message_result,
168+
request_header.consumer_group.as_str(),
169+
request_header.topic.as_str(),
170+
request_header.queue_id.unwrap(),
171+
);
172+
return Some(response.set_body(body));
173+
} /*else {
174+
None
175+
}*/
176+
None
177+
}
178+
ResponseCode::PullNotFound => Some(response),
179+
ResponseCode::PullOffsetMoved => Some(response),
180+
ResponseCode::PullRetryImmediately => Some(response),
181+
_ => None,
182+
}
111183
}
112184
}
113185

114186
impl DefaultPullMessageResultHandler {
187+
fn read_get_message_result(
188+
&self,
189+
get_message_result: &GetMessageResult,
190+
_group: &str,
191+
_topic: &str,
192+
_queue_id: i32,
193+
) -> Option<Bytes> {
194+
let mut bytes_mut =
195+
BytesMut::with_capacity(get_message_result.buffer_total_size() as usize);
196+
for msg in get_message_result.message_mapped_list() {
197+
let data = &msg.mapped_file.as_ref().unwrap().get_mapped_file()
198+
[msg.start_offset as usize..(msg.start_offset + msg.size as u64) as usize];
199+
bytes_mut.extend_from_slice(data);
200+
}
201+
Some(bytes_mut.freeze())
202+
}
203+
115204
fn execute_consume_message_hook_before(
116205
&self,
117206
request: &RemotingCommand,
@@ -306,4 +395,86 @@ impl DefaultPullMessageResultHandler {
306395
}
307396
response.set_command_custom_header_ref(response_header)
308397
}
398+
399+
fn try_commit_offset(
400+
&self,
401+
broker_allow_suspend: bool,
402+
request_header: &PullMessageRequestHeader,
403+
next_offset: i64,
404+
client_address: SocketAddr,
405+
) {
406+
self.consumer_offset_manager.commit_pull_offset(
407+
client_address,
408+
request_header.consumer_group.as_str(),
409+
request_header.topic.as_str(),
410+
request_header.queue_id.unwrap(),
411+
next_offset,
412+
);
413+
414+
let mut store_offset_enable = broker_allow_suspend;
415+
let has_commit_offset_flag =
416+
PullSysFlag::has_commit_offset_flag(request_header.sys_flag as u32);
417+
store_offset_enable = store_offset_enable && has_commit_offset_flag;
418+
if store_offset_enable {
419+
self.consumer_offset_manager.commit_offset(
420+
client_address,
421+
request_header.consumer_group.as_str(),
422+
request_header.topic.as_str(),
423+
request_header.queue_id.unwrap(),
424+
request_header.commit_offset,
425+
);
426+
}
427+
}
428+
429+
fn update_broadcast_pulled_offset(
430+
&self,
431+
topic: &str,
432+
group: &str,
433+
queue_id: i32,
434+
request_header: &PullMessageRequestHeader,
435+
channel: &Channel,
436+
response: Option<&RemotingCommand>,
437+
next_begin_offset: i64,
438+
) {
439+
if response.is_none() || !self.broker_config.enable_broadcast_offset_store {
440+
return;
441+
}
442+
let proxy_pull_broadcast =
443+
request_header.request_source == Some(RequestSource::ProxyForBroadcast.get_value());
444+
let consumer_group_info = self.consumer_manager.get_consumer_group_info(group);
445+
446+
if is_broadcast(proxy_pull_broadcast, consumer_group_info.as_ref()) {
447+
let mut offset = request_header.queue_offset;
448+
if let Some(response) = response {
449+
if ResponseCode::from(response.code()) == ResponseCode::PullOffsetMoved {
450+
offset = next_begin_offset;
451+
}
452+
}
453+
454+
let client_id = if proxy_pull_broadcast {
455+
request_header
456+
.proxy_forward_client_id
457+
.clone()
458+
.unwrap_or_default()
459+
} else if let Some(ref consumer_group_info) = consumer_group_info {
460+
if let Some(ref client_channel_info) =
461+
consumer_group_info.find_channel_by_channel(channel)
462+
{
463+
client_channel_info.client_id().clone()
464+
} else {
465+
return;
466+
}
467+
} else {
468+
return;
469+
};
470+
self.broadcast_offset_manager.update_offset(
471+
topic,
472+
group,
473+
queue_id,
474+
offset,
475+
client_id.as_str(),
476+
proxy_pull_broadcast,
477+
);
478+
}
479+
}
309480
}

0 commit comments

Comments
 (0)