Skip to content

Commit e0fe4a7

Browse files
authored
[ISSUE #1303]🚀Add QueryAssignmentRequestBody and QueryAssignmentResponseBody (#1304)
1 parent b45646d commit e0fe4a7

File tree

6 files changed

+205
-1
lines changed

6 files changed

+205
-1
lines changed

‎rocketmq-common/src/common/message.rs

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub mod message_ext;
3737
pub mod message_ext_broker_inner;
3838
pub mod message_id;
3939
pub mod message_queue;
40+
pub mod message_queue_assignment;
4041
pub mod message_single;
4142

4243
/// This module defines the `MessageTrait` trait, which provides a flexible interface for working

‎rocketmq-common/src/common/message/message_enum.rs

+87-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use std::fmt;
18+
19+
use serde::de;
20+
use serde::de::Visitor;
21+
use serde::Deserialize;
22+
use serde::Deserializer;
23+
use serde::Serialize;
24+
use serde::Serializer;
25+
1726
#[derive(Debug, PartialEq, Copy, Clone, Default)]
1827
pub enum MessageType {
1928
#[default]
@@ -47,7 +56,7 @@ impl MessageType {
4756
}
4857
}
4958

50-
#[derive(Debug, PartialEq, Copy, Clone)]
59+
#[derive(Debug, PartialEq, Copy, Clone, Hash, Eq)]
5160
pub enum MessageRequestMode {
5261
Pull,
5362
Pop,
@@ -62,6 +71,48 @@ impl MessageRequestMode {
6271
}
6372
}
6473

74+
impl Serialize for MessageRequestMode {
75+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
76+
where
77+
S: Serializer,
78+
{
79+
serializer.serialize_str(match *self {
80+
MessageRequestMode::Pull => "PULL",
81+
MessageRequestMode::Pop => "POP",
82+
})
83+
}
84+
}
85+
86+
impl<'de> Deserialize<'de> for MessageRequestMode {
87+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
88+
where
89+
D: Deserializer<'de>,
90+
{
91+
struct MessageRequestModeVisitor;
92+
93+
impl Visitor<'_> for MessageRequestModeVisitor {
94+
type Value = MessageRequestMode;
95+
96+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
97+
formatter.write_str("a string representing a MessageRequestMode")
98+
}
99+
100+
fn visit_str<E>(self, value: &str) -> Result<MessageRequestMode, E>
101+
where
102+
E: de::Error,
103+
{
104+
match value {
105+
"PULL" => Ok(MessageRequestMode::Pull),
106+
"POP" => Ok(MessageRequestMode::Pop),
107+
_ => Err(de::Error::unknown_variant(value, &["PULL", "POP"])),
108+
}
109+
}
110+
}
111+
112+
deserializer.deserialize_str(MessageRequestModeVisitor)
113+
}
114+
}
115+
65116
#[cfg(test)]
66117
mod tests {
67118
use super::*;
@@ -108,4 +159,39 @@ mod tests {
108159
assert_eq!(MessageRequestMode::Pull.get_name(), "PULL");
109160
assert_eq!(MessageRequestMode::Pop.get_name(), "POP");
110161
}
162+
163+
#[test]
164+
fn serialize_message_request_mode_pull() {
165+
let mode = MessageRequestMode::Pull;
166+
let serialized = serde_json::to_string(&mode).unwrap();
167+
assert_eq!(serialized, "\"PULL\"");
168+
}
169+
170+
#[test]
171+
fn serialize_message_request_mode_pop() {
172+
let mode = MessageRequestMode::Pop;
173+
let serialized = serde_json::to_string(&mode).unwrap();
174+
assert_eq!(serialized, "\"POP\"");
175+
}
176+
177+
#[test]
178+
fn deserialize_message_request_mode_pull() {
179+
let json = "\"PULL\"";
180+
let deserialized: MessageRequestMode = serde_json::from_str(json).unwrap();
181+
assert_eq!(deserialized, MessageRequestMode::Pull);
182+
}
183+
184+
#[test]
185+
fn deserialize_message_request_mode_pop() {
186+
let json = "\"POP\"";
187+
let deserialized: MessageRequestMode = serde_json::from_str(json).unwrap();
188+
assert_eq!(deserialized, MessageRequestMode::Pop);
189+
}
190+
191+
#[test]
192+
fn deserialize_message_request_mode_invalid() {
193+
let json = "\"INVALID\"";
194+
let deserialized: Result<MessageRequestMode, _> = serde_json::from_str(json);
195+
assert!(deserialized.is_err());
196+
}
111197
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::collections::HashMap;
18+
use std::hash::Hash;
19+
use std::hash::Hasher;
20+
21+
use cheetah_string::CheetahString;
22+
use serde::Deserialize;
23+
use serde::Serialize;
24+
25+
use crate::common::message::message_enum::MessageRequestMode;
26+
use crate::common::message::message_queue::MessageQueue;
27+
28+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
29+
#[serde(rename_all = "camelCase")]
30+
pub struct MessageQueueAssignment {
31+
pub message_queue: Option<MessageQueue>,
32+
pub mode: MessageRequestMode,
33+
pub attachments: Option<HashMap<CheetahString, CheetahString>>,
34+
}
35+
36+
impl Hash for MessageQueueAssignment {
37+
fn hash<H: Hasher>(&self, state: &mut H) {
38+
self.message_queue.hash(state);
39+
self.mode.hash(state);
40+
if let Some(ref attachments) = self.attachments {
41+
for (key, value) in attachments {
42+
key.hash(state);
43+
value.hash(state);
44+
}
45+
}
46+
}
47+
}
48+
49+
impl Default for MessageQueueAssignment {
50+
fn default() -> Self {
51+
MessageQueueAssignment {
52+
message_queue: None,
53+
mode: MessageRequestMode::Pull,
54+
attachments: None,
55+
}
56+
}
57+
}

‎rocketmq-remoting/src/protocol/body.rs

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pub mod group_list;
3030
pub mod kv_table;
3131
pub mod pop_process_queue_info;
3232
pub mod process_queue_info;
33+
pub mod query_assignment_request_body;
34+
pub mod query_assignment_response_body;
3335
pub mod request;
3436
pub mod response;
3537
pub mod topic;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use cheetah_string::CheetahString;
18+
use serde::Deserialize;
19+
use serde::Serialize;
20+
21+
use crate::protocol::heartbeat::message_model::MessageModel;
22+
23+
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24+
#[serde(rename_all = "camelCase")]
25+
pub struct QueryAssignmentRequestBody {
26+
pub topic: CheetahString,
27+
pub consumer_group: CheetahString,
28+
pub client_id: CheetahString,
29+
pub strategy_name: CheetahString,
30+
pub message_model: MessageModel,
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::collections::HashSet;
18+
19+
use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
20+
use serde::Deserialize;
21+
use serde::Serialize;
22+
23+
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24+
#[serde(rename_all = "camelCase")]
25+
pub struct QueryAssignmentResponseBody {
26+
pub message_queue_assignments: HashSet<MessageQueueAssignment>,
27+
}

0 commit comments

Comments
 (0)