Skip to content

[ISSUE #1456]🔥Refactor rocketmq-broker crate error handle🚨 #1457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum BrokerError {
#[error("broker client error: {0}")]
BrokerClientError(#[from] rocketmq_remoting::remoting_error::RemotingError),
BrokerRemotingError(#[from] rocketmq_remoting::remoting_error::RemotingError),

#[error("Common error: {0}")]
BrokerCommonError(#[from] rocketmq_common::error::Error),
Expand All @@ -34,3 +34,34 @@ pub enum BrokerError {
#[error("Client error: {0}")]
ClientError(#[from] rocketmq_client_rust::error::MQClientError),
}

#[cfg(test)]
mod tests {
use rocketmq_remoting::remoting_error::RemotingError;

use super::*;

#[test]
fn broker_remoting_error_displays_correctly() {
let error = BrokerError::BrokerRemotingError(RemotingError::RemoteError(
"remote error".to_string(),
));
assert_eq!(format!("{}", error), "broker client error: remote error");
}

#[test]
fn mq_broker_error_displays_correctly() {
let error =
BrokerError::MQBrokerError(404, "not found".to_string(), "127.0.0.1".to_string());
assert_eq!(
format!("{}", error),
"Client exception occurred: CODE:404, broker address:127.0.0.1, Message:not found"
);
}

#[test]
fn illegal_argument_error_displays_correctly() {
let error = BrokerError::IllegalArgumentError("illegal argument".to_string());
assert_eq!(format!("{}", error), "illegal argument");
}
}
8 changes: 4 additions & 4 deletions rocketmq-broker/src/client/net/broker_to_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;

use crate::error::BrokerError::BrokerClientError;
use crate::error::BrokerError::BrokerCommonError;
use crate::broker_error::BrokerError::BrokerCommonError;
use crate::broker_error::BrokerError::BrokerRemotingError;
use crate::Result;

#[derive(Default, Clone)]
Expand All @@ -38,7 +38,7 @@
) -> Result<RemotingCommand> {
match channel.send_wait_response(request, timeout_millis).await {
Ok(value) => Ok(value),
Err(e) => Err(BrokerClientError(e)),
Err(e) => Err(BrokerRemotingError(e)),

Check warning on line 41 in rocketmq-broker/src/client/net/broker_to_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/net/broker_to_client.rs#L41

Added line #L41 was not covered by tests
}
}

Expand All @@ -63,7 +63,7 @@
}
match channel.send_one_way(request, 100).await {
Ok(_) => Ok(()),
Err(e) => Err(BrokerClientError(e)),
Err(e) => Err(BrokerRemotingError(e)),

Check warning on line 66 in rocketmq-broker/src/client/net/broker_to_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/net/broker_to_client.rs#L66

Added line #L66 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

The 100ms timeout for transaction state checks is too short compared to other timeouts in the codebase

The verification reveals that the 100ms timeout for send_one_way in transaction state checks is inconsistent with other timeout configurations in the codebase:

  • The default transaction timeout in BrokerConfig is set to 6000ms (6 seconds)
  • Other RPC operations like producer's send_msg_timeout default to 3000ms
  • The broker's forward_timeout is set to 3000ms

The current 100ms timeout is significantly lower than these values and could lead to unnecessary transaction verification failures.

🔗 Analysis chain

Consider reviewing the timeout value for transaction state checks

While the error handling change to BrokerRemotingError is appropriate, the 100ms timeout for send_one_way in transaction state verification seems quite short. This could lead to unnecessary failures in high-load scenarios or when network latency is higher than usual.

Let's check if this timeout value is consistent with other transaction-related operations:

Consider increasing the timeout value or making it configurable based on the deployment environment and network conditions.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check timeout values used in transaction-related operations

# Search for send_one_way calls with timeout values
echo "Checking timeout values in send_one_way calls:"
rg -A 2 "send_one_way.*await"

# Search for transaction state related timeouts
echo "Checking transaction state related timeout configurations:"
rg "transaction.*timeout|timeout.*transaction"

Length of output: 1825


Script:

#!/bin/bash
# Let's check the actual implementation of the transaction check method and related configurations

# Check the implementation of check_producer_transaction_state
ast-grep --pattern 'fn check_producer_transaction_state'

# Check if there are any other RPC timeout configurations in the broker
rg -A 2 "timeout.*ms|timeout:.*[0-9]+"

# Check if there are any comments or documentation about RPC timeouts
rg "timeout" -g "*.md" -g "*.txt"

Length of output: 35628

}
}
}
4 changes: 2 additions & 2 deletions rocketmq-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
pub use broker_bootstrap::BrokerBootstrap;
pub use broker_bootstrap::Builder;

use crate::error::BrokerError;
use crate::broker_error::BrokerError;

pub mod command;

pub(crate) mod broker;
pub(crate) mod broker_bootstrap;
pub(crate) mod broker_error;
pub(crate) mod broker_path_config_helper;
pub(crate) mod broker_runtime;
pub(crate) mod client;
pub(crate) mod coldctr;
pub(crate) mod controller;
pub(crate) mod error;
pub(crate) mod filter;
pub(crate) mod hook;
pub(crate) mod load_balance;
Expand Down
8 changes: 4 additions & 4 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
use tracing::info;
use tracing::warn;

use crate::error::BrokerError;
use crate::error::BrokerError::BrokerClientError;
use crate::broker_error::BrokerError;
use crate::broker_error::BrokerError::BrokerRemotingError;
use crate::Result;

pub struct BrokerOuterAPI {
Expand Down Expand Up @@ -367,7 +367,7 @@
))
}
}
Err(e) => Err(BrokerClientError(e)),
Err(e) => Err(BrokerRemotingError(e)),

Check warning on line 370 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L370

Added line #L370 was not covered by tests
}
}

Expand Down Expand Up @@ -402,7 +402,7 @@
))
}
}
Err(e) => Err(BrokerClientError(e)),
Err(e) => Err(BrokerRemotingError(e)),

Check warning on line 405 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L405

Added line #L405 was not covered by tests
}
}

Expand Down
4 changes: 2 additions & 2 deletions rocketmq-broker/src/processor/query_assignment_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::Se
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
use rocketmq_remoting::protocol::{RemotingDeserializable, RemotingSerializable};
use crate::client::manager::consumer_manager::ConsumerManager;
use crate::error::BrokerError;
use crate::error::BrokerError::IllegalArgumentError;
use crate::broker_error::BrokerError;
use crate::broker_error::BrokerError::IllegalArgumentError;
use crate::topic::manager::topic_route_info_manager::TopicRouteInfoManager;
use crate::Result;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use rocketmq_rust::RocketMQTokioMutex;
use tracing::info;
use tracing::warn;

use crate::error::BrokerError::MQBrokerError;
use crate::broker_error::BrokerError::MQBrokerError;
use crate::out_api::broker_outer_api::BrokerOuterAPI;

const GET_TOPIC_ROUTE_TIMEOUT: u64 = 3000;
Expand Down
Loading