Skip to content

[ISSUE #415]🎨Optmize code logic #416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,26 @@
name_server_address: None,
}
}

fn create_request(broker_name: String, topic_config: TopicConfig) -> RemotingCommand {
let request_header =
RegisterTopicRequestHeader::new(topic_config.topic_name.as_ref().unwrap());
let queue_data = QueueData::new(
broker_name.clone(),
topic_config.read_queue_nums,
topic_config.write_queue_nums,
topic_config.perm,
topic_config.topic_sys_flag,
);
let topic_route_data = TopicRouteData {
queue_datas: vec![queue_data],
..Default::default()
};
let topic_route_body = topic_route_data.encode();

Check warning on line 87 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L73-L87

Added lines #L73 - L87 were not covered by tests

RemotingCommand::create_request_command(RequestCode::RegisterTopicInNamesrv, request_header)
.set_body(Some(topic_route_body))
}

Check warning on line 91 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L89-L91

Added lines #L89 - L91 were not covered by tests
}

impl BrokerOuterAPI {
Expand Down Expand Up @@ -184,26 +204,7 @@
topic_config: TopicConfig,
timeout_mills: u64,
) {
let request_header =
RegisterTopicRequestHeader::new(topic_config.topic_name.as_ref().unwrap());
let queue_data = QueueData::new(
broker_name.clone(),
topic_config.read_queue_nums,
topic_config.write_queue_nums,
topic_config.perm,
topic_config.topic_sys_flag,
);
let topic_route_data = TopicRouteData {
queue_datas: vec![queue_data],
..Default::default()
};
let topic_route_body = topic_route_data.encode();

let request = RemotingCommand::create_request_command(
RequestCode::RegisterTopicInNamesrv,
request_header,
)
.set_body(Some(topic_route_body));
let request = Self::create_request(broker_name, topic_config);

Check warning on line 207 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L207

Added line #L207 was not covered by tests
let name_server_address_list = self.remoting_client.get_available_name_srv_list();
let mut handle_vec = Vec::with_capacity(name_server_address_list.len());
for namesrv_addr in name_server_address_list.iter() {
Expand Down
191 changes: 106 additions & 85 deletions rocketmq-broker/src/topic/manager/topic_config_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,81 +262,62 @@
client_default_topic_queue_nums: i32,
topic_sys_flag: u32,
) -> Option<TopicConfig> {
let mut create_new = false;
let lock = self
let (topic_config, create_new) = if let Some(_lock) = self

Check warning on line 265 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L265

Added line #L265 was not covered by tests
.topic_config_table_lock
.try_lock_for(Duration::from_secs(3));
let topic_config = if lock.is_some() {
let mut topic_config = self.get_topic_config(topic);
if topic_config.is_some() {
return topic_config;
.try_lock_for(Duration::from_secs(3))

Check warning on line 267 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L267

Added line #L267 was not covered by tests
{
if let Some(topic_config) = self.get_topic_config(topic) {
return Some(topic_config);

Check warning on line 270 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L269-L270

Added lines #L269 - L270 were not covered by tests
}
let mut default_topic_config = self.get_topic_config(default_topic);
let topic_config = if default_topic_config.is_some() {
//default topic

if let Some(mut default_topic_config) = self.get_topic_config(default_topic) {

Check warning on line 273 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L273

Added line #L273 was not covered by tests
if default_topic == TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC
&& !self.broker_config.auto_create_topic_enable
{
default_topic_config.as_mut().unwrap().perm =
PermName::PERM_READ | PermName::PERM_WRITE;
default_topic_config.perm = PermName::PERM_READ | PermName::PERM_WRITE;

Check warning on line 277 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L277

Added line #L277 was not covered by tests
}

if PermName::is_inherited(default_topic_config.as_ref().unwrap().perm) {
topic_config = Some(TopicConfig::new(topic));
let mut queue_nums = client_default_topic_queue_nums
.min(default_topic_config.as_ref().unwrap().write_queue_nums as i32);
if queue_nums < 0 {
queue_nums = 0;
}
let ref_topic_config = topic_config.as_mut().unwrap();
ref_topic_config.write_queue_nums = queue_nums as u32;
ref_topic_config.read_queue_nums = queue_nums as u32;
let mut perm = default_topic_config.as_ref().unwrap().perm;
perm &= !PermName::PERM_INHERIT;
ref_topic_config.perm = perm;
ref_topic_config.topic_sys_flag = topic_sys_flag;
ref_topic_config.topic_filter_type =
default_topic_config.as_ref().unwrap().topic_filter_type
} else {
warn!(
"Create new topic failed, because the default topic[{}] has no perm [{}] \
producer:[{}]",
default_topic,
default_topic_config.as_ref().unwrap().perm,
remote_address
);
}

if topic_config.is_some() {
if PermName::is_inherited(default_topic_config.perm) {
let mut topic_config = TopicConfig::new(topic);
let queue_nums = client_default_topic_queue_nums
.min(default_topic_config.write_queue_nums as i32)

Check warning on line 283 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L280-L283

Added lines #L280 - L283 were not covered by tests
.max(0);
topic_config.write_queue_nums = queue_nums as u32;
topic_config.read_queue_nums = queue_nums as u32;
topic_config.perm = default_topic_config.perm & !PermName::PERM_INHERIT;
topic_config.topic_sys_flag = topic_sys_flag;
topic_config.topic_filter_type = default_topic_config.topic_filter_type;

Check warning on line 289 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L285-L289

Added lines #L285 - L289 were not covered by tests
info!(
"Create new topic by default topic:[{}] config:[{:?}] producer:[{}]",
default_topic,
topic_config.as_ref().unwrap(),
remote_address
default_topic, topic_config, remote_address
);
let _ = self.put_topic_config(topic_config.clone().unwrap());
self.put_topic_config(topic_config.clone());

Check warning on line 294 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L294

Added line #L294 was not covered by tests
self.data_version.lock().next_version_with(
self.message_store
.as_ref()
.unwrap()
.get_state_machine_version(),
);
create_new = true;
self.persist();
(Some(topic_config), true)
} else {
warn!(

Check warning on line 304 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L302-L304

Added lines #L302 - L304 were not covered by tests
"Create new topic failed, because the default topic[{}] has no perm [{}] \
producer:[{}]",
default_topic, default_topic_config.perm, remote_address
);
(None, false)

Check warning on line 309 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L309

Added line #L309 was not covered by tests
}
topic_config
} else {
None
};
topic_config
(None, false)

Check warning on line 312 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L312

Added line #L312 was not covered by tests
}
} else {
None
(None, false)

Check warning on line 315 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L315

Added line #L315 was not covered by tests
};
drop(lock);

if create_new {
self.register_broker_data(topic_config.as_ref().unwrap());
}

topic_config
}

Expand All @@ -348,52 +329,46 @@
is_order: bool,
topic_sys_flag: u32,
) -> Option<TopicConfig> {
let mut topic_config = self.get_topic_config(topic);
if let Some(ref mut config) = topic_config {
if let Some(ref mut config) = self.get_topic_config(topic) {

Check warning on line 332 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L332

Added line #L332 was not covered by tests
if is_order != config.order {
config.order = is_order;
self.update_topic_config(config);
}
return topic_config;
return Some(config.clone());

Check warning on line 337 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L337

Added line #L337 was not covered by tests
}
let mut create_new = false;

let lock = self
let (topic_config, create_new) = if let Some(_lock) = self

Check warning on line 340 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L340

Added line #L340 was not covered by tests
.topic_config_table_lock
.try_lock_for(Duration::from_secs(3));
let topic_config_result = if lock.is_some() {
topic_config = self.get_topic_config(topic);
if topic_config.is_some() {
return topic_config;
}
topic_config = Some(TopicConfig::new(topic));
if let Some(ref mut config) = topic_config {
config.read_queue_nums = client_default_topic_queue_nums as u32;
config.write_queue_nums = client_default_topic_queue_nums as u32;
config.perm = perm;
config.topic_sys_flag = topic_sys_flag;
config.order = is_order;
info!("create new topic {:?}", config);
self.put_topic_config(config.clone());
create_new = true;
self.data_version.lock().next_version_with(
self.message_store
.as_ref()
.unwrap()
.get_state_machine_version(),
);
self.persist();
.try_lock_for(Duration::from_secs(3))

Check warning on line 342 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L342

Added line #L342 was not covered by tests
{
if let Some(config) = self.get_topic_config(topic) {
return Some(config);

Check warning on line 345 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L344-L345

Added lines #L344 - L345 were not covered by tests
}
topic_config

let mut config = TopicConfig::new(topic);
config.read_queue_nums = client_default_topic_queue_nums as u32;
config.write_queue_nums = client_default_topic_queue_nums as u32;
config.perm = perm;
config.topic_sys_flag = topic_sys_flag;
config.order = is_order;

Check warning on line 353 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L348-L353

Added lines #L348 - L353 were not covered by tests

self.put_topic_config(config.clone());
self.data_version.lock().next_version_with(
self.message_store

Check warning on line 357 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L355-L357

Added lines #L355 - L357 were not covered by tests
.as_ref()
.unwrap()
.get_state_machine_version(),
);
self.persist();
(Some(config), true)

Check warning on line 363 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L361-L363

Added lines #L361 - L363 were not covered by tests
} else {
None
(None, false)

Check warning on line 365 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L365

Added line #L365 was not covered by tests
};
drop(lock);

if create_new {
self.register_broker_data(topic_config_result.as_ref().unwrap());
self.register_broker_data(topic_config.as_ref().unwrap());

Check warning on line 369 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L369

Added line #L369 was not covered by tests
}

topic_config_result
topic_config

Check warning on line 371 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L371

Added line #L371 was not covered by tests
}

fn register_broker_data(&mut self, topic_config: &TopicConfig) {
Expand Down Expand Up @@ -471,6 +446,52 @@
pub fn set_message_store(&mut self, message_store: Option<DefaultMessageStore>) {
self.message_store = message_store;
}

pub fn create_topic_of_tran_check_max_time(

Check warning on line 450 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L450

Added line #L450 was not covered by tests
&mut self,
client_default_topic_queue_nums: i32,
perm: u32,
) -> Option<TopicConfig> {
if let Some(ref mut config) =
self.get_topic_config(TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC)

Check warning on line 456 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L455-L456

Added lines #L455 - L456 were not covered by tests
{
return Some(config.clone());
}

Check warning on line 459 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L458-L459

Added lines #L458 - L459 were not covered by tests

let (topic_config, create_new) = if let Some(_lock) = self

Check warning on line 461 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L461

Added line #L461 was not covered by tests
.topic_config_table_lock
.try_lock_for(Duration::from_secs(3))

Check warning on line 463 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L463

Added line #L463 was not covered by tests
{
if let Some(config) =
self.get_topic_config(TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC)

Check warning on line 466 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L465-L466

Added lines #L465 - L466 were not covered by tests
{
return Some(config);
}

Check warning on line 469 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L468-L469

Added lines #L468 - L469 were not covered by tests

let mut config = TopicConfig::new(TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);
config.read_queue_nums = client_default_topic_queue_nums as u32;
config.write_queue_nums = client_default_topic_queue_nums as u32;
config.perm = perm;
config.topic_sys_flag = 0;
info!("create new topic {:?}", config);
self.put_topic_config(config.clone());
self.data_version.lock().next_version_with(
self.message_store

Check warning on line 479 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L471-L479

Added lines #L471 - L479 were not covered by tests
.as_ref()
.unwrap()
.get_state_machine_version(),
);
self.persist();
(Some(config), true)
} else {
(None, false)
};

Check warning on line 488 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L483-L488

Added lines #L483 - L488 were not covered by tests

if create_new {
self.register_broker_data(topic_config.as_ref().unwrap());

Check warning on line 491 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L490-L491

Added lines #L490 - L491 were not covered by tests
}
topic_config
}

Check warning on line 494 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L493-L494

Added lines #L493 - L494 were not covered by tests
}

//Fully implemented will be removed
Expand Down
21 changes: 6 additions & 15 deletions rocketmq-remoting/src/clients/rocketmq_default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,24 +176,15 @@
timeout_millis: u64,
) -> Result<RemotingCommand, RemotingError> {
let client = self.get_and_create_client(addr.clone());
/*if let Ok(resp) = time::timeout(Duration::from_millis(timeout_millis), async {
client.lock().await.send_read(request).await.unwrap()
})
.await*/

match tokio::spawn(async move {
match time::timeout(Duration::from_millis(timeout_millis), async move {
client.lock().await.send_read(request).await.unwrap()
})
.await
{
Ok(result) => Ok(result),
Err(err) => Err(RemotingError::RemoteException(err.to_string())),
}
match time::timeout(Duration::from_millis(timeout_millis), async move {
client.lock().await.send_read(request).await

Check warning on line 180 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L179-L180

Added lines #L179 - L180 were not covered by tests
})
.await
{
Ok(result) => result,
Ok(result) => match result {
Ok(response) => Ok(response),
Err(err) => Err(RemotingError::RemoteException(err.to_string())),

Check warning on line 186 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L184-L186

Added lines #L184 - L186 were not covered by tests
},
Err(err) => Err(RemotingError::RemoteException(err.to_string())),
}
}
Expand Down
Loading