Skip to content

Commit df10f9a

Browse files
authored
[ISSUE #1136]🚀Support broker receive transaction message-3 (#1218)
1 parent 69ba1f1 commit df10f9a

File tree

3 files changed

+202
-34
lines changed

3 files changed

+202
-34
lines changed

rocketmq-broker/src/processor/end_transaction_processor.rs

+122-30
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use rocketmq_store::base::message_status_enum::PutMessageStatus;
3939
use rocketmq_store::config::broker_role::BrokerRole;
4040
use rocketmq_store::config::message_store_config::MessageStoreConfig;
4141
use rocketmq_store::log_file::MessageStore;
42+
use tracing::warn;
4243

4344
use crate::transaction::operation_result::OperationResult;
4445
use crate::transaction::queue::transactional_message_util::TransactionalMessageUtil;
@@ -84,6 +85,7 @@ where
8485
.decode_command_custom_header::<EndTransactionRequestHeader>()
8586
.expect("EndTransactionRequestHeader decode failed");
8687
if BrokerRole::Slave == self.message_store_config.broker_role {
88+
warn!("Message store is slave mode, so end transaction is forbidden. ");
8789
return Some(RemotingCommand::create_response_command_with_code(
8890
ResponseCode::SlaveNotAvailable,
8991
));
@@ -110,35 +112,36 @@ where
110112

111113
let result = if MessageSysFlag::TRANSACTION_COMMIT_TYPE == request_header.commit_or_rollback
112114
{
113-
let from_transaction_check = request_header.from_transaction_check;
114-
let commit_or_rollback = request_header.commit_or_rollback;
115-
let params = (
116-
request_header.producer_group.to_string(),
117-
request_header.tran_state_table_offset as i64,
118-
request_header.commit_log_offset as i64,
119-
);
120115
let result = self
121116
.transactional_message_service
122-
.commit_message(request_header);
117+
.commit_message(&request_header);
123118
if result.response_code == ResponseCode::Success {
124119
if self.reject_commit_or_rollback(
125-
from_transaction_check,
120+
request_header.from_transaction_check,
126121
result.prepare_message.as_ref().unwrap(),
127122
) {
123+
warn!(
124+
"Message commit fail [producer end]. currentTimeMillis - bornTime > \
125+
checkImmunityTime, msgId={},commitLogOffset={}, wait check",
126+
request_header.msg_id, request_header.commit_log_offset
127+
);
128128
return Some(RemotingCommand::create_response_command_with_code(
129129
ResponseCode::IllegalOperation,
130130
));
131131
}
132-
let res = self.check_prepare_message(result.prepare_message.as_ref(), &params);
132+
let res =
133+
self.check_prepare_message(result.prepare_message.as_ref(), &request_header);
133134
if ResponseCode::from(res.code()) != ResponseCode::Success {
134135
let mut msg_inner =
135136
end_message_transaction(result.prepare_message.as_ref().unwrap());
136137
msg_inner.message_ext_inner.sys_flag = MessageSysFlag::reset_transaction_value(
137138
msg_inner.message_ext_inner.sys_flag,
138-
commit_or_rollback,
139+
request_header.commit_or_rollback,
139140
);
140-
msg_inner.message_ext_inner.queue_offset = params.1;
141-
msg_inner.message_ext_inner.prepared_transaction_offset = params.2;
141+
msg_inner.message_ext_inner.queue_offset =
142+
request_header.tran_state_table_offset as i64;
143+
msg_inner.message_ext_inner.prepared_transaction_offset =
144+
request_header.commit_log_offset as i64;
142145
msg_inner.message_ext_inner.store_timestamp =
143146
result.prepare_message.as_ref().unwrap().store_timestamp;
144147
MessageAccessor::clear_property(
@@ -159,25 +162,25 @@ where
159162
OperationResult::default()
160163
}
161164
} else if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == request_header.commit_or_rollback {
162-
let from_transaction_check = request_header.from_transaction_check;
163-
let params = (
164-
request_header.producer_group.to_string(),
165-
request_header.tran_state_table_offset as i64,
166-
request_header.commit_log_offset as i64,
167-
);
168165
let result = self
169166
.transactional_message_service
170-
.rollback_message(request_header);
167+
.rollback_message(&request_header);
171168
if result.response_code == ResponseCode::Success {
172169
if self.reject_commit_or_rollback(
173-
from_transaction_check,
170+
request_header.from_transaction_check,
174171
result.prepare_message.as_ref().unwrap(),
175172
) {
173+
warn!(
174+
"Message commit fail [producer end]. currentTimeMillis - bornTime > \
175+
checkImmunityTime, msgId={},commitLogOffset={}, wait check",
176+
request_header.msg_id, request_header.commit_log_offset
177+
);
176178
return Some(RemotingCommand::create_response_command_with_code(
177179
ResponseCode::IllegalOperation,
178180
));
179181
}
180-
let res = self.check_prepare_message(result.prepare_message.as_ref(), &params);
182+
let res =
183+
self.check_prepare_message(result.prepare_message.as_ref(), &request_header);
181184
if ResponseCode::from(res.code()) == ResponseCode::Success {
182185
let _ = self
183186
.transactional_message_service
@@ -223,7 +226,8 @@ where
223226
fn check_prepare_message(
224227
&self,
225228
message_ext: Option<&MessageExt>,
226-
params: &(String, i64, i64),
229+
// params: &(String, i64, i64),
230+
request_header: &EndTransactionRequestHeader,
227231
) -> RemotingCommand {
228232
let mut command = RemotingCommand::create_response_command();
229233
if let Some(message_ext) = message_ext {
@@ -236,17 +240,17 @@ where
236240
return command;
237241
}
238242
let pgroup = pgroup_read.unwrap();
239-
if pgroup != params.0 {
243+
if pgroup != request_header.producer_group.as_str() {
240244
command.set_code_mut(ResponseCode::SystemError);
241245
command.set_remark_mut("The producer group wrong");
242246
return command;
243247
}
244-
if message_ext.queue_offset != params.1 {
248+
if message_ext.queue_offset != request_header.tran_state_table_offset as i64 {
245249
command.set_code_mut(ResponseCode::SystemError);
246250
command.set_remark_mut("The transaction state table offset wrong");
247251
return command;
248252
}
249-
if message_ext.commit_log_offset != params.2 {
253+
if message_ext.commit_log_offset != request_header.commit_log_offset as i64 {
250254
command.set_code_mut(ResponseCode::SystemError);
251255
command.set_remark_mut("The commit log offset wrong");
252256
return command;
@@ -372,10 +376,11 @@ fn end_message_transaction(msg_ext: &MessageExt) -> MessageExtBrokerInner {
372376
} else {
373377
TopicFilterType::SingleTag
374378
};
375-
let tags_code_value = MessageExtBrokerInner::tags_string2tags_code(
376-
&topic_filter_type,
377-
msg_ext.get_tags().as_ref().unwrap(),
378-
);
379+
let tags_code_value = if let Some(tags) = msg_ext.get_tags() {
380+
MessageExtBrokerInner::tags_string2tags_code(&topic_filter_type, tags.as_str())
381+
} else {
382+
0
383+
};
379384
msg_inner.tags_code = tags_code_value;
380385
MessageAccessor::set_properties(&mut msg_inner, msg_ext.get_properties().clone());
381386
msg_inner.properties_string =
@@ -384,3 +389,90 @@ fn end_message_transaction(msg_ext: &MessageExt) -> MessageExtBrokerInner {
384389
MessageAccessor::clear_property(&mut msg_inner, MessageConst::PROPERTY_REAL_QUEUE_ID);
385390
msg_inner
386391
}
392+
393+
#[cfg(test)]
394+
mod tests {
395+
use super::*;
396+
397+
#[test]
398+
fn end_message_transaction_with_valid_message() {
399+
let msg_ext = MessageExt::default();
400+
let msg_inner = end_message_transaction(&msg_ext);
401+
assert_eq!(
402+
msg_inner.get_topic(),
403+
&msg_ext
404+
.get_user_property(&CheetahString::from_static_str(
405+
MessageConst::PROPERTY_REAL_TOPIC
406+
))
407+
.unwrap_or_default()
408+
);
409+
assert_eq!(
410+
msg_inner.message_ext_inner.queue_id,
411+
msg_ext
412+
.get_user_property(&CheetahString::from_static_str(
413+
MessageConst::PROPERTY_REAL_QUEUE_ID
414+
))
415+
.unwrap_or_default()
416+
.parse::<i32>()
417+
.unwrap_or_default()
418+
);
419+
assert_eq!(msg_inner.get_body(), msg_ext.get_body());
420+
assert_eq!(msg_inner.get_flag(), msg_ext.get_flag());
421+
assert_eq!(
422+
msg_inner.message_ext_inner.born_timestamp,
423+
msg_ext.born_timestamp
424+
);
425+
assert_eq!(msg_inner.message_ext_inner.born_host, msg_ext.born_host);
426+
assert_eq!(msg_inner.message_ext_inner.store_host, msg_ext.store_host);
427+
assert_eq!(
428+
msg_inner.message_ext_inner.reconsume_times,
429+
msg_ext.reconsume_times
430+
);
431+
assert!(msg_inner.is_wait_store_msg_ok());
432+
/* assert_eq!(
433+
msg_inner.get_transaction_id(),
434+
&msg_ext
435+
.get_user_property(&CheetahString::from_static_str(
436+
MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
437+
))
438+
.unwrap_or_default()
439+
);*/
440+
assert_eq!(msg_inner.message_ext_inner.sys_flag, msg_ext.sys_flag);
441+
/* assert_eq!(
442+
msg_inner.tags_code,
443+
MessageExtBrokerInner::tags_string2tags_code(
444+
&TopicFilterType::SingleTag,
445+
msg_ext.get_tags().as_ref().unwrap()
446+
)
447+
);*/
448+
assert_eq!(msg_inner.get_properties(), msg_ext.get_properties());
449+
assert_eq!(
450+
msg_inner.properties_string,
451+
message_decoder::message_properties_to_string(msg_ext.get_properties())
452+
);
453+
}
454+
455+
#[test]
456+
fn end_message_transaction_with_empty_body() {
457+
let mut msg_ext = MessageExt::default();
458+
//msg_ext.set_body(None);
459+
let msg_inner = end_message_transaction(&msg_ext);
460+
assert!(!msg_inner.get_body().is_some_and(|b| b.is_empty()));
461+
}
462+
463+
#[test]
464+
fn end_message_transaction_with_missing_properties() {
465+
let mut msg_ext = MessageExt::default();
466+
msg_ext.put_property(
467+
CheetahString::from_static_str(MessageConst::PROPERTY_REAL_TOPIC),
468+
CheetahString::empty(),
469+
);
470+
msg_ext.put_property(
471+
CheetahString::from_static_str(MessageConst::PROPERTY_REAL_QUEUE_ID),
472+
CheetahString::empty(),
473+
);
474+
let msg_inner = end_message_transaction(&msg_ext);
475+
assert!(msg_inner.get_topic().is_empty());
476+
assert_eq!(msg_inner.message_ext_inner.queue_id, 0);
477+
}
478+
}

rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,16 @@ where
208208
}
209209
}
210210

211-
fn commit_message(&mut self, request_header: EndTransactionRequestHeader) -> OperationResult {
211+
#[inline]
212+
fn commit_message(&mut self, request_header: &EndTransactionRequestHeader) -> OperationResult {
212213
self.get_half_message_by_offset(request_header.commit_log_offset as i64)
213214
}
214215

215-
fn rollback_message(&mut self, request_header: EndTransactionRequestHeader) -> OperationResult {
216+
#[inline]
217+
fn rollback_message(
218+
&mut self,
219+
request_header: &EndTransactionRequestHeader,
220+
) -> OperationResult {
216221
self.get_half_message_by_offset(request_header.commit_log_offset as i64)
217222
}
218223

rocketmq-broker/src/transaction/transactional_message_service.rs

+73-2
Original file line numberDiff line numberDiff line change
@@ -22,33 +22,104 @@ use rocketmq_store::base::message_result::PutMessageResult;
2222
use crate::transaction::operation_result::OperationResult;
2323
use crate::transaction::transaction_metrics::TransactionMetrics;
2424

25+
/// Trait defining the local transactional message service.
26+
/// This trait provides methods for preparing, committing, rolling back, and checking transactional
27+
/// messages, as well as managing the state of the transactional message service.
2528
#[trait_variant::make(TransactionalMessageService: Send)]
2629
pub trait TransactionalMessageServiceLocal: Sync + 'static {
30+
/// Prepares a transactional message.
31+
///
32+
/// # Arguments
33+
///
34+
/// * `message_inner` - The inner message to be prepared.
35+
///
36+
/// # Returns
37+
///
38+
/// A `PutMessageResult` indicating the result of the message preparation.
2739
async fn prepare_message(&mut self, message_inner: MessageExtBrokerInner) -> PutMessageResult;
2840

41+
/// Asynchronously prepares a transactional message.
42+
///
43+
/// # Arguments
44+
///
45+
/// * `message_inner` - The inner message to be prepared.
46+
///
47+
/// # Returns
48+
///
49+
/// A `PutMessageResult` indicating the result of the message preparation.
2950
async fn async_prepare_message(
3051
&mut self,
3152
message_inner: MessageExtBrokerInner,
3253
) -> PutMessageResult;
3354

55+
/// Deletes a prepared transactional message.
56+
///
57+
/// # Arguments
58+
///
59+
/// * `message_ext` - The external message to be deleted.
60+
///
61+
/// # Returns
62+
///
63+
/// A boolean indicating whether the message was successfully deleted.
3464
async fn delete_prepare_message(&mut self, message_ext: &MessageExt) -> bool;
3565

36-
fn commit_message(&mut self, request_header: EndTransactionRequestHeader) -> OperationResult;
66+
/// Commits a transactional message.
67+
///
68+
/// # Arguments
69+
///
70+
/// * `request_header` - The request header containing the transaction details.
71+
///
72+
/// # Returns
73+
///
74+
/// An `OperationResult` indicating the result of the commit operation.
75+
fn commit_message(&mut self, request_header: &EndTransactionRequestHeader) -> OperationResult;
3776

38-
fn rollback_message(&mut self, request_header: EndTransactionRequestHeader) -> OperationResult;
77+
/// Rolls back a transactional message.
78+
///
79+
/// # Arguments
80+
///
81+
/// * `request_header` - The request header containing the transaction details.
82+
///
83+
/// # Returns
84+
///
85+
/// An `OperationResult` indicating the result of the rollback operation.
86+
fn rollback_message(&mut self, request_header: &EndTransactionRequestHeader)
87+
-> OperationResult;
3988

89+
/// Checks the state of transactional messages.
90+
///
91+
/// # Arguments
92+
///
93+
/// * `transaction_timeout` - The timeout for the transaction.
94+
/// * `transaction_check_max` - The maximum number of transaction checks.
4095
fn check(
4196
&self,
4297
transaction_timeout: u64,
4398
transaction_check_max: i32,
4499
// listener: AbstractTransactionalMessageCheckListener,
45100
);
46101

102+
/// Opens the transactional message service.
103+
///
104+
/// # Returns
105+
///
106+
/// A boolean indicating whether the service was successfully opened.
47107
fn open(&self) -> bool;
48108

109+
/// Closes the transactional message service.
49110
fn close(&self);
50111

112+
/// Gets the transaction metrics.
113+
///
114+
/// # Returns
115+
///
116+
/// A reference to the `TransactionMetrics`.
51117
fn get_transaction_metrics(&self) -> &TransactionMetrics;
52118

119+
/// Sets the transaction metrics.
120+
///
121+
/// # Arguments
122+
///
123+
/// * `transaction_metrics` - The transaction metrics to be set.
53124
fn set_transaction_metrics(&mut self, transaction_metrics: TransactionMetrics);
54125
}

0 commit comments

Comments
 (0)