File tree 10 files changed +43
-31
lines changed
src/producer/producer_impl
rocketmq-common/src/common
10 files changed +43
-31
lines changed Original file line number Diff line number Diff line change @@ -429,14 +429,14 @@ mod tests {
429
429
msg_ext. reconsume_times
430
430
) ;
431
431
assert ! ( msg_inner. is_wait_store_msg_ok( ) ) ;
432
- /* assert_eq!(
432
+ assert_eq ! (
433
433
msg_inner. get_transaction_id( ) ,
434
- & msg_ext
434
+ msg_ext
435
435
. get_user_property( & CheetahString :: from_static_str(
436
436
MessageConst :: PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
437
437
) )
438
- .unwrap_or_default ()
439
- );*/
438
+ . as_ref ( )
439
+ ) ;
440
440
assert_eq ! ( msg_inner. message_ext_inner. sys_flag, msg_ext. sys_flag) ;
441
441
/* assert_eq!(
442
442
msg_inner.tags_code,
Original file line number Diff line number Diff line change @@ -79,13 +79,12 @@ impl TransactionalMessageUtil {
79
79
. message_ext_inner
80
80
. set_born_timestamp ( msg_ext. born_timestamp ) ;
81
81
msg_inner. message_ext_inner . set_born_host ( msg_ext. born_host ) ;
82
- msg_inner. set_transaction_id (
83
- msg_ext
84
- . get_property ( & CheetahString :: from_static_str (
85
- MessageConst :: PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX ,
86
- ) )
87
- . unwrap_or_default ( ) ,
88
- ) ;
82
+
83
+ if let Some ( transaction_id) = msg_ext. get_property ( & CheetahString :: from_static_str (
84
+ MessageConst :: PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX ,
85
+ ) ) {
86
+ msg_inner. set_transaction_id ( transaction_id) ;
87
+ }
89
88
90
89
MessageAccessor :: set_properties ( & mut msg_inner, msg_ext. get_properties ( ) . clone ( ) ) ;
91
90
MessageAccessor :: put_property (
@@ -168,11 +167,11 @@ mod tests {
168
167
assert_eq ! ( msg_inner. get_flag( ) , msg_ext. get_flag( ) ) ;
169
168
assert_eq ! (
170
169
msg_inner. get_transaction_id( ) ,
171
- & msg_ext
170
+ msg_ext
172
171
. get_property( & CheetahString :: from_static_str(
173
172
MessageConst :: PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
174
173
) )
175
- . unwrap_or_default ( )
174
+ . as_ref ( )
176
175
) ;
177
176
}
178
177
Original file line number Diff line number Diff line change @@ -92,13 +92,18 @@ impl TransactionListener for TransactionListenerImpl {
92
92
. fetch_add ( 1 , std:: sync:: atomic:: Ordering :: AcqRel ) ;
93
93
let status = value % 3 ;
94
94
let mut guard = self . local_trans . lock ( ) ;
95
- guard. insert ( msg. get_transaction_id ( ) . clone ( ) , status) ;
95
+ guard. insert (
96
+ msg. get_transaction_id ( ) . cloned ( ) . unwrap_or_default ( ) ,
97
+ status,
98
+ ) ;
96
99
LocalTransactionState :: Unknown
97
100
}
98
101
99
102
fn check_local_transaction ( & self , msg : & MessageExt ) -> LocalTransactionState {
100
103
let mut guard = self . local_trans . lock ( ) ;
101
- let status = guard. get ( msg. get_transaction_id ( ) ) . unwrap_or ( & -1 ) ;
104
+ let status = guard
105
+ . get ( & msg. get_transaction_id ( ) . cloned ( ) . unwrap_or_default ( ) )
106
+ . unwrap_or ( & -1 ) ;
102
107
match status {
103
108
1 => LocalTransactionState :: CommitMessage ,
104
109
2 => LocalTransactionState :: RollbackMessage ,
Original file line number Diff line number Diff line change @@ -2029,7 +2029,7 @@ impl DefaultMQProducerImpl {
2029
2029
producer_group : self . producer_config . producer_group ( ) . clone ( ) ,
2030
2030
message : msg,
2031
2031
msg_id : msg_id. clone ( ) ,
2032
- transaction_id : msg. get_transaction_id ( ) . clone ( ) ,
2032
+ transaction_id : msg. get_transaction_id ( ) . cloned ( ) . unwrap_or_default ( ) ,
2033
2033
broker_addr : broker_addr. clone ( ) ,
2034
2034
from_transaction_check,
2035
2035
transaction_state : local_transaction_state,
Original file line number Diff line number Diff line change @@ -302,7 +302,7 @@ pub trait MessageTrait: Any + Display + Debug {
302
302
/// # Returns
303
303
///
304
304
/// A reference to the transaction ID as a `&str`.
305
- fn get_transaction_id ( & self ) -> & CheetahString ;
305
+ fn get_transaction_id ( & self ) -> Option < & CheetahString > ;
306
306
307
307
/// Sets the transaction ID for the message.
308
308
///
Original file line number Diff line number Diff line change @@ -179,11 +179,8 @@ impl MessageTrait for MessageBatch {
179
179
self . final_message . properties = properties;
180
180
}
181
181
182
- fn get_transaction_id ( & self ) -> & CheetahString {
183
- self . final_message
184
- . transaction_id
185
- . as_ref ( )
186
- . expect ( "transaction_id is None" )
182
+ fn get_transaction_id ( & self ) -> Option < & CheetahString > {
183
+ self . final_message . transaction_id . as_ref ( )
187
184
}
188
185
189
186
fn set_transaction_id ( & mut self , transaction_id : CheetahString ) {
Original file line number Diff line number Diff line change @@ -105,7 +105,7 @@ impl MessageTrait for MessageClientExt {
105
105
self . message_ext_inner . set_properties ( properties) ;
106
106
}
107
107
108
- fn get_transaction_id ( & self ) -> & CheetahString {
108
+ fn get_transaction_id ( & self ) -> Option < & CheetahString > {
109
109
self . message_ext_inner . get_transaction_id ( )
110
110
}
111
111
Original file line number Diff line number Diff line change @@ -230,7 +230,7 @@ impl MessageExt {
230
230
impl Default for MessageExt {
231
231
fn default ( ) -> Self {
232
232
Self {
233
- message : Default :: default ( ) ,
233
+ message : Message :: default ( ) ,
234
234
broker_name : CheetahString :: default ( ) ,
235
235
queue_id : 0 ,
236
236
store_size : 0 ,
@@ -321,7 +321,7 @@ impl MessageTrait for MessageExt {
321
321
}
322
322
323
323
#[ inline]
324
- fn get_transaction_id ( & self ) -> & CheetahString {
324
+ fn get_transaction_id ( & self ) -> Option < & CheetahString > {
325
325
self . message . get_transaction_id ( )
326
326
}
327
327
Original file line number Diff line number Diff line change @@ -235,7 +235,7 @@ impl MessageTrait for MessageExtBrokerInner {
235
235
self . message_ext_inner . set_properties ( properties) ;
236
236
}
237
237
238
- fn get_transaction_id ( & self ) -> & CheetahString {
238
+ fn get_transaction_id ( & self ) -> Option < & CheetahString > {
239
239
self . message_ext_inner . get_transaction_id ( )
240
240
}
241
241
Original file line number Diff line number Diff line change @@ -38,7 +38,7 @@ use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
38
38
use crate :: common:: TopicFilterType ;
39
39
use crate :: MessageUtils ;
40
40
41
- #[ derive( Clone , Debug , Default ) ]
41
+ #[ derive( Clone , Debug ) ]
42
42
pub struct Message {
43
43
pub topic : CheetahString ,
44
44
pub flag : i32 ,
@@ -50,6 +50,19 @@ pub struct Message {
50
50
pub transaction_id : Option < CheetahString > ,
51
51
}
52
52
53
+ impl Default for Message {
54
+ fn default ( ) -> Self {
55
+ Self {
56
+ topic : CheetahString :: new ( ) ,
57
+ flag : 0 ,
58
+ properties : HashMap :: new ( ) ,
59
+ body : None ,
60
+ compressed_body : None ,
61
+ transaction_id : None ,
62
+ }
63
+ }
64
+ }
65
+
53
66
impl Message {
54
67
pub fn new ( topic : impl Into < CheetahString > , body : & [ u8 ] ) -> Self {
55
68
Self :: with_details (
@@ -328,10 +341,8 @@ impl MessageTrait for Message {
328
341
}
329
342
330
343
#[ inline]
331
- fn get_transaction_id ( & self ) -> & CheetahString {
332
- self . transaction_id
333
- . as_ref ( )
334
- . expect ( "transaction_id is None" )
344
+ fn get_transaction_id ( & self ) -> Option < & CheetahString > {
345
+ self . transaction_id . as_ref ( )
335
346
}
336
347
337
348
fn set_transaction_id ( & mut self , transaction_id : CheetahString ) {
You can’t perform that action at this time.
0 commit comments