Skip to content

Commit 55126f9

Browse files
committed
[ISSUE #777]🔥Fix Broker can not started🐛
1 parent b635421 commit 55126f9

File tree

3 files changed

+54
-5
lines changed

3 files changed

+54
-5
lines changed

rocketmq-broker/src/broker_runtime.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use std::any::Any;
1817
use std::collections::HashMap;
1918
use std::sync::atomic::AtomicBool;
2019
use std::sync::atomic::AtomicU64;
@@ -416,10 +415,11 @@ impl BrokerRuntime {
416415
self.broker_config.clone(),
417416
));
418417

419-
let pull_message_result_handler = pull_message_result_handler.as_mut() as &mut dyn Any;
418+
let pull_message_result_handler = pull_message_result_handler.as_mut().as_mut();
420419
pull_message_result_handler
420+
.as_any_mut()
421421
.downcast_mut::<DefaultPullMessageResultHandler>()
422-
.unwrap()
422+
.expect("downcast DefaultPullMessageResultHandler failed")
423423
.set_pull_request_hold_service(Some(Arc::new(
424424
self.pull_request_hold_service.clone().unwrap(),
425425
)));

rocketmq-broker/src/processor/default_pull_message_result_handler.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
17+
use std::any::Any;
1818
use std::net::SocketAddr;
1919
use std::sync::Arc;
2020

@@ -231,6 +231,14 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
231231
_ => None,
232232
}
233233
}
234+
235+
fn as_any_mut(&mut self) -> &mut dyn Any {
236+
self
237+
}
238+
239+
fn as_any(&self) -> &dyn Any {
240+
self
241+
}
234242
}
235243

236244
impl DefaultPullMessageResultHandler {

rocketmq-broker/src/processor/pull_message_result_handler.rs

+42-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use std::any::Any;
1718

1819
use rocketmq_remoting::net::channel::Channel;
1920
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
@@ -25,7 +26,37 @@ use rocketmq_remoting::runtime::server::ConnectionHandlerContext;
2526
use rocketmq_store::base::get_message_result::GetMessageResult;
2627
use rocketmq_store::filter::MessageFilter;
2728

28-
pub trait PullMessageResultHandler: Sync + Send + 'static {
29+
/// Trait defining the behavior for handling the result of a pull message request.
30+
///
31+
/// This trait is designed to be implemented by types that handle the result of a pull message
32+
/// request in a RocketMQ broker. It provides a method for processing the result of a message
33+
/// retrieval operation, along with various parameters related to the request and the broker's
34+
/// state.
35+
pub trait PullMessageResultHandler: Sync + Send + Any + 'static {
36+
/// Handles the result of a pull message request.
37+
///
38+
/// This method processes the result of a message retrieval operation (`get_message_result`),
39+
/// using the provided request information, channel, context, subscription data, and other
40+
/// parameters to generate an appropriate response.
41+
///
42+
/// # Parameters
43+
/// - `get_message_result`: The result of the message retrieval operation.
44+
/// - `request`: The original remoting command representing the pull message request.
45+
/// - `request_header`: The header of the pull message request, containing request-specific
46+
/// information.
47+
/// - `channel`: The channel through which the request was received.
48+
/// - `ctx`: The connection handler context associated with the request.
49+
/// - `subscription_data`: Subscription data for the consumer making the request.
50+
/// - `subscription_group_config`: Configuration for the subscription group of the consumer.
51+
/// - `broker_allow_suspend`: Flag indicating whether the broker allows suspending the request.
52+
/// - `message_filter`: The message filter to apply to the retrieved messages.
53+
/// - `response`: The initial response remoting command to be potentially modified and returned.
54+
/// - `mapping_context`: Context for topic-queue mapping.
55+
/// - `begin_time_mills`: The timestamp (in milliseconds) when the request began processing.
56+
///
57+
/// # Returns
58+
/// An optional `RemotingCommand` representing the response to the pull message request.
59+
/// If `None`, it indicates that no response should be sent back to the client.
2960
fn handle(
3061
&self,
3162
get_message_result: GetMessageResult,
@@ -41,4 +72,14 @@ pub trait PullMessageResultHandler: Sync + Send + 'static {
4172
mapping_context: TopicQueueMappingContext,
4273
begin_time_mills: u64,
4374
) -> Option<RemotingCommand>;
75+
76+
/// Returns a mutable reference to `self` as a trait object of type `Any`.
77+
///
78+
/// This method is useful for downcasting the trait object to its concrete type.
79+
fn as_any_mut(&mut self) -> &mut dyn Any;
80+
81+
/// Returns a reference to `self` as a trait object of type `Any`.
82+
///
83+
/// This method is useful for downcasting the trait object to its concrete type.
84+
fn as_any(&self) -> &dyn Any;
4485
}

0 commit comments

Comments
 (0)