@@ -27,6 +27,10 @@ use crate::load_balance::message_request_mode_manager::MessageRequestModeManager
27
27
use rocketmq_store:: config:: message_store_config:: MessageStoreConfig ;
28
28
use std:: collections:: HashMap ;
29
29
use std:: sync:: Arc ;
30
+ use rocketmq_common:: common:: mix_all:: RETRY_GROUP_TOPIC_PREFIX ;
31
+ use rocketmq_remoting:: code:: response_code:: ResponseCode ;
32
+ use rocketmq_remoting:: protocol:: body:: set_message_request_mode_request_body:: SetMessageRequestModeRequestBody ;
33
+ use rocketmq_remoting:: protocol:: RemotingDeserializable ;
30
34
31
35
pub struct QueryAssignmentProcessor {
32
36
message_request_mode_manager : MessageRequestModeManager ,
@@ -89,8 +93,27 @@ impl QueryAssignmentProcessor {
89
93
& mut self ,
90
94
_channel : Channel ,
91
95
_ctx : ConnectionHandlerContext ,
92
- _request : RemotingCommand ,
96
+ request : RemotingCommand ,
93
97
) -> Option < RemotingCommand > {
94
- unimplemented ! ( )
98
+ let request_body =
99
+ SetMessageRequestModeRequestBody :: decode ( request. get_body ( ) . expect ( "empty body" ) )
100
+ . expect ( "decode SetMessageRequestModeRequestBody failed" ) ;
101
+ if request_body. topic . starts_with ( RETRY_GROUP_TOPIC_PREFIX ) {
102
+ return Some (
103
+ RemotingCommand :: create_response_command_with_code ( ResponseCode :: NoPermission )
104
+ . set_remark ( CheetahString :: from_static_str (
105
+ "retry topic is not allowed to set mode" ,
106
+ ) ) ,
107
+ ) ;
108
+ }
109
+ self . message_request_mode_manager . set_message_request_mode (
110
+ request_body. topic . clone ( ) ,
111
+ request_body. consumer_group . clone ( ) ,
112
+ request_body,
113
+ ) ;
114
+ self . message_request_mode_manager . persist ( ) ;
115
+ Some ( RemotingCommand :: create_response_command_with_code (
116
+ ResponseCode :: Success ,
117
+ ) )
95
118
}
96
119
}
0 commit comments