Skip to content

Commit de83e75

Browse files
committed
[ISSUE #1230]🚀Support broker receive transaction message-4
1 parent c2281c4 commit de83e75

10 files changed

+402
-4
lines changed

rocketmq-broker/src/broker_runtime.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use crate::broker::broker_hook::BrokerShutdownHook;
5454
use crate::client::default_consumer_ids_change_listener::DefaultConsumerIdsChangeListener;
5555
use crate::client::manager::consumer_manager::ConsumerManager;
5656
use crate::client::manager::producer_manager::ProducerManager;
57+
use crate::client::net::broker_to_client::Broker2Client;
5758
use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager;
5859
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
5960
use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook;
@@ -126,7 +127,9 @@ pub(crate) struct BrokerRuntime {
126127
#[cfg(feature = "local_file_store")]
127128
transactional_message_service:
128129
Option<ArcMut<DefaultTransactionalMessageService<DefaultMessageStore>>>,
129-
transactional_message_check_listener: Option<Arc<DefaultTransactionalMessageCheckListener>>,
130+
#[cfg(feature = "local_file_store")]
131+
transactional_message_check_listener:
132+
Option<Arc<DefaultTransactionalMessageCheckListener<DefaultMessageStore>>>,
130133
transactional_message_check_service: Option<Arc<TransactionalMessageCheckService>>,
131134
transaction_metrics_flush_service: Option<Arc<TransactionMetricsFlushService>>,
132135
}
@@ -689,7 +692,13 @@ impl BrokerRuntime {
689692
}
690693
}
691694
self.transactional_message_check_listener =
692-
Some(Arc::new(DefaultTransactionalMessageCheckListener));
695+
Some(Arc::new(DefaultTransactionalMessageCheckListener::new(
696+
self.broker_config.clone(),
697+
self.producer_manager.clone(),
698+
Broker2Client,
699+
self.topic_config_manager.clone(),
700+
self.message_store.as_ref().cloned().unwrap(),
701+
)));
693702
self.transactional_message_check_service = Some(Arc::new(TransactionalMessageCheckService));
694703
self.transaction_metrics_flush_service = Some(Arc::new(TransactionMetricsFlushService));
695704
}

rocketmq-broker/src/client/manager/producer_manager.rs

+23
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717

1818
use std::collections::HashMap;
19+
use std::sync::atomic::AtomicI32;
20+
use std::sync::Arc;
1921

2022
use cheetah_string::CheetahString;
2123
use rocketmq_common::TimeUtils::get_current_millis;
@@ -31,13 +33,15 @@ pub struct ProducerManager {
3133
HashMap<CheetahString /* group name */, HashMap<Channel, ClientChannelInfo>>,
3234
>,
3335
client_channel_table: parking_lot::Mutex<HashMap<CheetahString, Channel /* client ip:port */>>,
36+
positive_atomic_counter: Arc<AtomicI32>,
3437
}
3538

3639
impl ProducerManager {
3740
pub fn new() -> Self {
3841
Self {
3942
group_channel_table: parking_lot::Mutex::new(HashMap::new()),
4043
client_channel_table: parking_lot::Mutex::new(HashMap::new()),
44+
positive_atomic_counter: Arc::new(Default::default()),
4145
}
4246
}
4347
}
@@ -116,4 +120,23 @@ impl ProducerManager {
116120
pub fn find_channel(&self, client_id: &str) -> Option<Channel> {
117121
self.client_channel_table.lock().get(client_id).cloned()
118122
}
123+
124+
pub fn get_available_channel(&self, group: Option<&CheetahString>) -> Option<Channel> {
125+
let group = group?;
126+
let group_channel_table = self.group_channel_table.lock();
127+
let channel_map = group_channel_table.get(group);
128+
if let Some(channel_map) = channel_map {
129+
if channel_map.is_empty() {
130+
return None;
131+
}
132+
let channels = channel_map.keys().collect::<Vec<&Channel>>();
133+
let index = self
134+
.positive_atomic_counter
135+
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
136+
let index = index.unsigned_abs() as usize % channels.len();
137+
let channel = channels[index].clone();
138+
return Some(channel);
139+
}
140+
None
141+
}
119142
}

rocketmq-broker/src/client/net/broker_to_client.rs

+31
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,16 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use cheetah_string::CheetahString;
18+
use rocketmq_common::common::message::message_ext::MessageExt;
19+
use rocketmq_common::MessageDecoder;
20+
use rocketmq_remoting::code::request_code::RequestCode;
1721
use rocketmq_remoting::net::channel::Channel;
22+
use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader;
1823
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
1924

2025
use crate::error::BrokerError::BrokerClientError;
26+
use crate::error::BrokerError::BrokerCommonError;
2127
use crate::Result;
2228

2329
#[derive(Default, Clone)]
@@ -35,4 +41,29 @@ impl Broker2Client {
3541
Err(e) => Err(BrokerClientError(e)),
3642
}
3743
}
44+
45+
pub async fn check_producer_transaction_state(
46+
&self,
47+
_group: &CheetahString,
48+
channel: &mut Channel,
49+
request_header: CheckTransactionStateRequestHeader,
50+
message_ext: MessageExt,
51+
) -> Result<()> {
52+
let mut request = RemotingCommand::create_request_command(
53+
RequestCode::CheckTransactionState,
54+
request_header,
55+
);
56+
match MessageDecoder::encode(&message_ext, false) {
57+
Ok(body) => {
58+
request.set_body_mut_ref(body);
59+
}
60+
Err(e) => {
61+
return Err(BrokerCommonError(e));
62+
}
63+
}
64+
match channel.send_one_way(request, 100).await {
65+
Ok(_) => Ok(()),
66+
Err(e) => Err(BrokerClientError(e)),
67+
}
68+
}
3869
}

rocketmq-broker/src/error.rs

+4
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
*/
1717
use thiserror::Error;
1818

19+
#[allow(clippy::enum_variant_names)]
1920
#[derive(Debug, Error)]
2021
pub enum BrokerError {
2122
#[error("broker client error: {0}")]
2223
BrokerClientError(#[from] rocketmq_remoting::error::Error),
2324

25+
#[error("Common error: {0}")]
26+
BrokerCommonError(#[from] rocketmq_common::error::Error),
27+
2428
#[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
2529
MQBrokerError(i32, String, String),
2630
}

rocketmq-broker/src/transaction.rs

+1
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ pub(crate) mod operation_result;
1818
pub(crate) mod queue;
1919
pub(crate) mod transaction_metrics;
2020
pub(crate) mod transaction_metrics_flush_service;
21+
pub(crate) mod transactional_message_check_listener;
2122
pub(crate) mod transactional_message_check_service;
2223
pub(crate) mod transactional_message_service;

0 commit comments

Comments
 (0)