@@ -21,15 +21,20 @@ use rocketmq_client_rust::producer::send_result::SendResult;
21
21
use rocketmq_client_rust:: producer:: send_status:: SendStatus ;
22
22
use rocketmq_common:: common:: broker:: broker_config:: BrokerConfig ;
23
23
use rocketmq_common:: common:: message:: message_ext_broker_inner:: MessageExtBrokerInner ;
24
+ use rocketmq_common:: common:: message:: message_queue:: MessageQueue ;
25
+ use rocketmq_common:: common:: message:: MessageConst ;
24
26
use rocketmq_common:: common:: message:: MessageTrait ;
25
27
use rocketmq_common:: common:: mix_all;
26
28
use rocketmq_runtime:: RocketMQRuntime ;
27
29
use rocketmq_rust:: ArcMut ;
28
30
use rocketmq_store:: base:: message_result:: PutMessageResult ;
29
31
use rocketmq_store:: base:: message_status_enum:: PutMessageStatus ;
30
32
use rocketmq_store:: log_file:: MessageStore ;
33
+ use tracing:: warn;
31
34
35
+ use crate :: out_api:: broker_outer_api:: BrokerOuterAPI ;
32
36
use crate :: topic:: manager:: topic_route_info_manager:: TopicRouteInfoManager ;
37
+ use crate :: transaction:: queue:: transactional_message_util:: TransactionalMessageUtil ;
33
38
34
39
const SEND_TIMEOUT : u64 = 3_000 ;
35
40
const DEFAULT_PULL_TIMEOUT_MILLIS : u64 = 10_000 ;
@@ -79,6 +84,7 @@ pub(crate) struct EscapeBridge<MS> {
79
84
message_store : ArcMut < MS > ,
80
85
broker_config : Arc < BrokerConfig > ,
81
86
topic_route_info_manager : Arc < TopicRouteInfoManager > ,
87
+ broker_outer_api : Arc < BrokerOuterAPI > ,
82
88
}
83
89
84
90
impl < MS > EscapeBridge < MS >
@@ -104,10 +110,110 @@ where
104
110
105
111
pub async fn put_message_to_remote_broker (
106
112
& mut self ,
107
- _message_ext : MessageExtBrokerInner ,
108
- _broker_name_to_send : Option < CheetahString > ,
113
+ message_ext : MessageExtBrokerInner ,
114
+ mut broker_name_to_send : Option < CheetahString > ,
109
115
) -> Option < SendResult > {
110
- unimplemented ! ( "EscapeBridge putMessageToRemoteBroker" )
116
+ if broker_name_to_send. is_some ( )
117
+ && self . broker_config . broker_identity . broker_name . as_str ( )
118
+ == broker_name_to_send. as_ref ( ) . unwrap ( ) . as_str ( )
119
+ {
120
+ return None ;
121
+ }
122
+ let is_trans_half_message =
123
+ TransactionalMessageUtil :: build_half_topic ( ) == message_ext. get_topic ( ) ;
124
+ let mut message_to_put = if is_trans_half_message {
125
+ TransactionalMessageUtil :: build_transactional_message_from_half_message (
126
+ & message_ext. message_ext_inner ,
127
+ )
128
+ } else {
129
+ message_ext
130
+ } ;
131
+ let topic_publish_info = self
132
+ . topic_route_info_manager
133
+ . try_to_find_topic_publish_info ( message_to_put. get_topic ( ) )
134
+ . await ;
135
+ if !topic_publish_info. as_ref ( ) . is_some_and ( |value| value. ok ( ) ) {
136
+ warn ! (
137
+ "putMessageToRemoteBroker: no route info of topic {} when escaping message, \
138
+ msgId={}",
139
+ message_to_put. get_topic( ) ,
140
+ message_to_put. message_ext_inner. msg_id
141
+ ) ;
142
+ return None ;
143
+ }
144
+ let topic_publish_info = topic_publish_info. unwrap ( ) ;
145
+ let _mq_selected = if !broker_name_to_send
146
+ . as_ref ( )
147
+ . is_some_and ( |value| !value. is_empty ( ) )
148
+ {
149
+ let mq = topic_publish_info
150
+ . select_one_message_queue_by_broker ( broker_name_to_send. as_ref ( ) )
151
+ . unwrap ( ) ;
152
+ message_to_put. message_ext_inner . queue_id = mq. get_queue_id ( ) ;
153
+ broker_name_to_send = Some ( mq. get_broker_name ( ) . clone ( ) ) ;
154
+ if self . broker_config . broker_identity . broker_name . as_str ( )
155
+ == mq. get_broker_name ( ) . as_str ( )
156
+ {
157
+ warn ! (
158
+ "putMessageToRemoteBroker failed, remote broker not found. Topic: {}, MsgId: \
159
+ {}, Broker: {}",
160
+ message_to_put. get_topic( ) ,
161
+ message_to_put. message_ext_inner. msg_id,
162
+ mq. get_broker_name( )
163
+ ) ;
164
+ return None ;
165
+ }
166
+ mq
167
+ } else {
168
+ MessageQueue :: from_parts (
169
+ message_to_put. get_topic ( ) ,
170
+ broker_name_to_send. clone ( ) . unwrap ( ) ,
171
+ message_to_put. queue_id ( ) ,
172
+ )
173
+ } ;
174
+ let broker_addr_to_send = self
175
+ . topic_route_info_manager
176
+ . find_broker_address_in_publish ( broker_name_to_send. as_ref ( ) ) ;
177
+ if broker_addr_to_send. is_none ( ) {
178
+ warn ! (
179
+ "putMessageToRemoteBroker failed, remote broker not found. Topic: {}, MsgId: {}, \
180
+ Broker: {}",
181
+ message_to_put. get_topic( ) ,
182
+ message_to_put. message_ext_inner. msg_id,
183
+ broker_name_to_send. as_ref( ) . unwrap( )
184
+ ) ;
185
+ return None ;
186
+ }
187
+ let producer_group = self . get_producer_group ( & message_to_put) ;
188
+ let result = self
189
+ . broker_outer_api
190
+ . send_message_to_specific_broker (
191
+ broker_addr_to_send. as_ref ( ) . unwrap ( ) ,
192
+ message_to_put. message_ext_inner ,
193
+ producer_group,
194
+ SEND_TIMEOUT ,
195
+ )
196
+ . await ;
197
+ match result {
198
+ Ok ( value) => {
199
+ if value. send_status == SendStatus :: SendOk {
200
+ Some ( value)
201
+ } else {
202
+ None
203
+ }
204
+ }
205
+ Err ( _) => None ,
206
+ }
207
+ }
208
+
209
+ fn get_producer_group ( & self , message_ext : & MessageExtBrokerInner ) -> CheetahString {
210
+ let producer_group = message_ext. get_property ( & CheetahString :: from_static_str (
211
+ MessageConst :: PROPERTY_PRODUCER_GROUP ,
212
+ ) ) ;
213
+ match producer_group {
214
+ None => self . inner_producer_group_name . clone ( ) ,
215
+ Some ( value) => value,
216
+ }
111
217
}
112
218
}
113
219
0 commit comments