Skip to content

Commit a3d290b

Browse files
committed
[ISSUE #262]🚀Support register borker(request code:103)
1 parent 1334fcf commit a3d290b

File tree

7 files changed

+65
-74
lines changed

7 files changed

+65
-74
lines changed

rocketmq-broker/src/broker_runtime.rs

+6-26
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,8 @@ impl BrokerRuntime {
243243
.unwrap()
244244
.get_handle()
245245
.spawn(async move {
246-
let period = Duration::from_secs(5);
247-
let initial_delay = Some(Duration::from_secs(5));
246+
let period = Duration::from_secs(10);
247+
let initial_delay = Some(Duration::from_secs(60));
248248
// initial delay
249249
if let Some(initial_delay_inner) = initial_delay {
250250
tokio::time::sleep(initial_delay_inner).await;
@@ -272,10 +272,6 @@ impl BrokerRuntime {
272272

273273
/// Register broker to name server
274274
pub(crate) async fn register_broker_all(
275-
/* broker_config: Arc<BrokerConfig>,
276-
topic_config_manager: Arc<RwLock<TopicConfigManager>>,
277-
topic_queue_mapping_manager: Arc<RwLock<TopicQueueMappingManager>>,
278-
broker_out_api: Arc<RwLock<BrokerOuterAPI>>,*/
279275
&mut self,
280276
check_order_config: bool,
281277
oneway: bool,
@@ -307,15 +303,8 @@ impl BrokerRuntime {
307303
let topic_config_wrapper = self
308304
.topic_config_manager
309305
.build_serialize_wrapper(topic_config_table.clone());
310-
self.do_register_broker_all(
311-
/*Self::do_register_broker_all(
312-
self.broker_config.clone(),
313-
self.broker_out_api.clone(),*/
314-
check_order_config,
315-
oneway,
316-
topic_config_wrapper,
317-
)
318-
.await;
306+
self.do_register_broker_all(check_order_config, oneway, topic_config_wrapper)
307+
.await;
319308
topic_config_table.clear();
320309
}
321310

@@ -359,15 +348,8 @@ impl BrokerRuntime {
359348
self.broker_config.is_in_broker_container,
360349
)
361350
{
362-
self.do_register_broker_all(
363-
/*Self::do_register_broker_all(
364-
self.broker_config.clone(),
365-
self.broker_out_api.clone(),*/
366-
check_order_config,
367-
oneway,
368-
topic_config_wrapper,
369-
)
370-
.await;
351+
self.do_register_broker_all(check_order_config, oneway, topic_config_wrapper)
352+
.await;
371353
}
372354
}
373355

@@ -383,8 +365,6 @@ impl BrokerRuntime {
383365
}
384366

385367
async fn do_register_broker_all(
386-
/*broker_config: Arc<BrokerConfig>,
387-
broker_out_api: Arc<RwLock<BrokerOuterAPI>>,*/
388368
&mut self,
389369
_check_order_config: bool,
390370
oneway: bool,

rocketmq-broker/src/out_api/broker_outer_api.rs

+10-5
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ impl BrokerOuterAPI {
106106
heartbeat_timeout_millis,
107107
body_crc32: 0,
108108
};
109+
110+
//build request body
109111
let request_body = RegisterBrokerBody {
110112
topic_config_serialize_wrapper: topic_config_wrapper,
111113
filter_server_list,
@@ -119,15 +121,18 @@ impl BrokerOuterAPI {
119121
let cloned_body = body.clone();
120122
let cloned_header = request_header.clone();
121123
let addr = namesrv_addr.clone();
122-
let handle = self
123-
.register_broker(addr, oneway, timeout_mills, cloned_header, cloned_body)
124-
.await;
124+
let handle =
125+
self.register_broker(addr, oneway, timeout_mills, cloned_header, cloned_body);
125126
handle_vec.push(handle);
126127
}
127128

128-
for handle in handle_vec {
129+
/*for handle in handle_vec {
129130
//let result = self.broker_outer_executor.spawn(handle).await;
130131
register_broker_result_list.push(handle.unwrap());
132+
}*/
133+
while let Some(handle) = handle_vec.pop() {
134+
let result = tokio::join!(handle);
135+
register_broker_result_list.push(result.0.unwrap());
131136
}
132137
}
133138

@@ -147,10 +152,10 @@ impl BrokerOuterAPI {
147152
.set_body(Some(body.clone()));
148153

149154
if oneway {
150-
// match remoting_client.lock().unwrap().invoke_oneway(
151155
match self
152156
.remoting_client
153157
.invoke_oneway(namesrv_addr, request, timeout_mills)
158+
.await
154159
{
155160
Ok(_) => return None,
156161
Err(_) => {

rocketmq-remoting/src/clients.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ pub trait RemotingClient: RemotingService {
102102
invoke_callback: impl InvokeCallback,
103103
) -> Result<(), Box<dyn std::error::Error>>;
104104

105-
fn invoke_oneway(
105+
async fn invoke_oneway(
106106
&self,
107107
addr: String,
108108
request: RemotingCommand,

rocketmq-remoting/src/clients/blocking_client.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ impl BlockingClient {
6262
request: RemotingCommand,
6363
timeout: Duration,
6464
) -> anyhow::Result<()> {
65-
match self.rt.block_on(tokio::time::timeout(
66-
timeout,
67-
self.inner.send_request(request),
68-
)) {
65+
match self
66+
.rt
67+
.block_on(tokio::time::timeout(timeout, self.inner.send(request)))
68+
{
6969
Ok(Ok(_)) => Ok(()),
7070
Ok(Err(err)) => Err(err.into()),
7171
Err(err) => Err(err.into()),

rocketmq-remoting/src/clients/client.rs

+5-8
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ impl Client {
6060
///
6161
/// The `RemotingCommand` representing the response, wrapped in a `Result`. Returns an error if
6262
/// the invocation fails.
63-
pub async fn invoke(&mut self, request: RemotingCommand) -> anyhow::Result<RemotingCommand> {
64-
self.send_request(request).await?;
65-
let response = self.read_response().await?;
63+
pub async fn send_read(&mut self, request: RemotingCommand) -> anyhow::Result<RemotingCommand> {
64+
self.send(request).await?;
65+
let response = self.read().await?;
6666
Ok(response)
6767
}
6868

@@ -90,10 +90,7 @@ impl Client {
9090
/// # Returns
9191
///
9292
/// A `Result` indicating success or failure in sending the request.
93-
pub async fn send_request(
94-
&mut self,
95-
request: RemotingCommand,
96-
) -> anyhow::Result<(), RemotingError> {
93+
pub async fn send(&mut self, request: RemotingCommand) -> anyhow::Result<(), RemotingError> {
9794
self.connection.framed.send(request).await?;
9895
Ok(())
9996
}
@@ -104,7 +101,7 @@ impl Client {
104101
///
105102
/// The `RemotingCommand` representing the response, wrapped in a `Result`. Returns an error if
106103
/// reading the response fails.
107-
async fn read_response(&mut self) -> anyhow::Result<RemotingCommand, RemotingError> {
104+
async fn read(&mut self) -> anyhow::Result<RemotingCommand, RemotingError> {
108105
let response = self.connection.framed.next().await;
109106
response.unwrap()
110107
}

rocketmq-remoting/src/clients/rocketmq_default_impl.rs

+29-16
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,20 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use std::{collections::HashMap, error::Error, sync::Arc};
17+
use std::{collections::HashMap, error::Error, sync::Arc, time::Duration};
1818

1919
use rocketmq_common::TokioExecutorService;
20+
use tokio::{runtime::Handle, time, time::timeout};
2021
use tracing::info;
2122

2223
use crate::{
2324
clients::{Client, RemotingClient},
2425
protocol::remoting_command::RemotingCommand,
2526
remoting::{InvokeCallback, RemotingService},
26-
runtime::{
27-
config::client_config::TokioClientConfig, processor::RequestProcessor, RPCHook,
28-
ServiceBridge,
29-
},
27+
runtime::{config::client_config::TokioClientConfig, processor::RequestProcessor, RPCHook},
3028
};
3129

3230
pub struct RocketmqDefaultClient {
33-
service_bridge: ServiceBridge,
3431
tokio_client_config: TokioClientConfig,
3532
//cache connection
3633
connection_tables:
@@ -42,7 +39,6 @@ pub struct RocketmqDefaultClient {
4239
impl RocketmqDefaultClient {
4340
pub fn new(tokio_client_config: TokioClientConfig) -> Self {
4441
Self {
45-
service_bridge: ServiceBridge::new(),
4642
tokio_client_config,
4743
connection_tables: Default::default(),
4844
namesrv_addr_list: Arc::new(Default::default()),
@@ -133,7 +129,8 @@ impl RemotingClient for RocketmqDefaultClient {
133129
}
134130

135131
fn get_name_server_address_list(&self) -> Vec<String> {
136-
todo!()
132+
let cloned = self.namesrv_addr_list.clone();
133+
Handle::current().block_on(async move { cloned.lock().await.clone() })
137134
}
138135

139136
fn get_available_name_srv_list(&self) -> Vec<String> {
@@ -147,10 +144,15 @@ impl RemotingClient for RocketmqDefaultClient {
147144
timeout_millis: u64,
148145
) -> RemotingCommand {
149146
let client = self.get_and_create_client(addr.clone()).await;
150-
let client_ref = &mut *client.lock().await;
151-
ServiceBridge::invoke_sync(client_ref, request, timeout_millis)
152-
.await
153-
.unwrap()
147+
if let Ok(result) = timeout(Duration::from_millis(timeout_millis), async {
148+
client.lock().await.send_read(request).await.unwrap()
149+
})
150+
.await
151+
{
152+
result
153+
} else {
154+
RemotingCommand::create_response_command()
155+
}
154156
}
155157

156158
async fn invoke_async(
@@ -161,18 +163,29 @@ impl RemotingClient for RocketmqDefaultClient {
161163
invoke_callback: impl InvokeCallback,
162164
) -> Result<(), Box<dyn Error>> {
163165
let client = self.get_and_create_client(addr.clone()).await;
164-
let client_ref = &mut *client.lock().await;
165-
ServiceBridge::invoke_async(client_ref, request, timeout_millis, invoke_callback).await;
166+
if let Ok(resp) = time::timeout(Duration::from_millis(timeout_millis), async {
167+
client.lock().await.send_read(request).await.unwrap()
168+
})
169+
.await
170+
{
171+
invoke_callback.operation_succeed(resp)
172+
}
173+
166174
Ok(())
167175
}
168176

169-
fn invoke_oneway(
177+
async fn invoke_oneway(
170178
&self,
171179
addr: String,
172180
request: RemotingCommand,
173181
timeout_millis: u64,
174182
) -> Result<(), Box<dyn Error>> {
175-
todo!()
183+
let client = self.get_and_create_client(addr.clone()).await;
184+
let _ = time::timeout(Duration::from_millis(timeout_millis), async move {
185+
client.lock().await.send(request).await.unwrap()
186+
})
187+
.await;
188+
Ok(())
176189
}
177190

178191
fn register_processor(

rocketmq-remoting/src/runtime.rs

+10-14
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,11 @@
1515
* limitations under the License.
1616
*/
1717

18-
use std::{collections::HashMap, sync::Arc, time::Duration};
19-
20-
use tokio::{time, time::timeout};
18+
use std::{collections::HashMap, sync::Arc};
2119

2220
use crate::{
23-
clients::Client,
24-
net::ResponseFuture,
25-
protocol::{remoting_command::RemotingCommand, RemotingCommandType},
26-
remoting::InvokeCallback,
27-
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
21+
net::ResponseFuture, protocol::remoting_command::RemotingCommand,
22+
runtime::processor::RequestProcessor,
2823
};
2924

3025
pub mod config;
@@ -80,7 +75,7 @@ impl Default for ServiceBridge {
8075
}
8176

8277
impl ServiceBridge {
83-
pub fn process_message_received(
78+
/* pub fn process_message_received(
8479
&mut self,
8580
ctx: ConnectionHandlerContext,
8681
msg: RemotingCommand,
@@ -103,25 +98,26 @@ impl ServiceBridge {
10398
_ctx: ConnectionHandlerContext,
10499
_msg: RemotingCommand,
105100
) {
106-
}
101+
}*/
107102

103+
/* #[allow(unused_variables)]
108104
pub async fn invoke_async(
109105
client: &mut Client,
110106
//client: Arc<Mutex<Client>>,
111107
request: RemotingCommand,
112108
timeout_millis: u64,
113109
invoke_callback: impl InvokeCallback,
114110
) {
115-
if let Ok(resp) = time::timeout(Duration::from_millis(timeout_millis), async {
111+
if let Ok(resp) = time::timeout(Duration::from_millis(timeout_millis), async {
116112
client.invoke(request).await.unwrap()
117113
})
118114
.await
119115
{
120116
invoke_callback.operation_succeed(resp)
121117
}
122-
}
118+
}*/
123119

124-
pub async fn invoke_sync(
120+
/* pub async fn invoke_sync(
125121
client: &mut Client,
126122
request: RemotingCommand,
127123
timeout_millis: u64,
@@ -132,5 +128,5 @@ impl ServiceBridge {
132128
})
133129
.await;
134130
Some(result.unwrap())
135-
}
131+
}*/
136132
}

0 commit comments

Comments
 (0)