Skip to content

Commit 8d3987f

Browse files
committed
[ISSUE #40]Support put kv config(request code 100)
1 parent c080b90 commit 8d3987f

File tree

12 files changed

+185
-15
lines changed

12 files changed

+185
-15
lines changed

Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
[workspace]
22
members = [
3+
"rocketmq",
34
"rocketmq-common",
5+
"rocketmq-macros",
46
"rocketmq-namesrv",
5-
"rocketmq-remoting",
6-
"rocketmq"
7-
]
7+
"rocketmq-remoting"]
88
resolver = "2"
99

1010
[workspace.package]

rocketmq-macros/Cargo.toml

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "rocketmq-macros"
3+
version = "0.1.0"
4+
authors.workspace = true
5+
edition.workspace = true
6+
homepage.workspace = true
7+
repository.workspace = true
8+
license.workspace = true
9+
keywords.workspace = true
10+
readme.workspace = true
11+
description.workspace = true
12+
13+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
14+
15+
[dependencies]

rocketmq-macros/src/lib.rs

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
pub fn add(left: usize, right: usize) -> usize {
2+
left + right
3+
}
4+
5+
#[cfg(test)]
6+
mod tests {
7+
use super::*;
8+
9+
#[test]
10+
fn it_works() {
11+
let result = add(2, 2);
12+
assert_eq!(result, 4);
13+
}
14+
}

rocketmq-namesrv/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "namesrv"
2+
name = "rocketmq-namesrv"
33
version = "0.1.0"
44
authors.workspace = true
55
edition.workspace = true

rocketmq-namesrv/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Feature list:
1616

1717
| Feature | request code | Support | remark |
1818
| -------------------------------------- | ------------ | -------------- | ------ |
19-
| Put KV Config | 100 | :broken_heart: | |
19+
| Put KV Config | 100 | :heart: | ISSUE #40 |
2020
| Get KV Config | 101 | :broken_heart: | |
2121
| Delete KV Config | 102 | :broken_heart: | |
2222
| Get kv list by namespace | 219 | :broken_heart: | |

rocketmq-namesrv/src/bin/bootstrap_server.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
use std::{collections::HashMap, path::PathBuf, sync::Arc};
1919

2020
use clap::Parser;
21-
use namesrv::{
21+
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
22+
use rocketmq_namesrv::{
2223
processor::{default_request_processor::DefaultRequestProcessor, ClientRequestProcessor},
2324
KVConfigManager, RouteInfoManager,
2425
};
25-
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
2626
use rocketmq_remoting::{
2727
code::request_code::RequestCode,
2828
runtime::{processor::RequestProcessor, server},
@@ -80,11 +80,14 @@ fn init_processors(
8080
RequestCode::GetRouteinfoByTopic.to_i32(),
8181
Box::new(ClientRequestProcessor::new(
8282
route_info_manager_inner.clone(),
83-
namesrv_config,
83+
namesrv_config.clone(),
8484
kvconfig_manager_inner.clone(),
8585
)),
8686
);
87-
(processors, DefaultRequestProcessor::new())
87+
(
88+
processors,
89+
DefaultRequestProcessor::new(namesrv_config.clone()),
90+
)
8891
}
8992

9093
#[derive(Parser, Debug)]

rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use tracing::info;
2222

2323
use crate::kvconfig::KVConfigSerializeWrapper;
2424

25+
#[derive(Debug)]
2526
pub struct KVConfigManager {
2627
pub(crate) config_table:
2728
HashMap<String /* Namespace */, HashMap<String /* Key */, String /* Value */>>,

rocketmq-namesrv/src/processor/default_request_processor.rs

+39-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::sync::Arc;
1919

2020
use bytes::Bytes;
2121
use rocketmq_common::{
22-
common::{mix_all, mq_version::RocketMqVersion},
22+
common::{mix_all, mq_version::RocketMqVersion, namesrv::namesrv_config::NamesrvConfig},
2323
CRC32Utils,
2424
};
2525
use rocketmq_remoting::{
@@ -29,19 +29,23 @@ use rocketmq_remoting::{
2929
broker_body::register_broker_body::RegisterBrokerBody,
3030
topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper,
3131
},
32-
header::broker_request_header::RegisterBrokerRequestHeader,
32+
header::{
33+
broker_request_header::RegisterBrokerRequestHeader,
34+
namesrv::kv_config_request_header::PutKVConfigRequestHeader,
35+
},
3336
remoting_command::RemotingCommand,
3437
RemotingSerializable,
3538
},
3639
runtime::processor::RequestProcessor,
3740
};
3841
use tracing::warn;
3942

40-
use crate::route::route_info_manager::RouteInfoManager;
43+
use crate::{route::route_info_manager::RouteInfoManager, KVConfigManager};
4144

4245
#[derive(Debug, Clone)]
4346
pub struct DefaultRequestProcessor {
4447
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
48+
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
4549
}
4650

4751
impl RequestProcessor for DefaultRequestProcessor {
@@ -56,25 +60,55 @@ impl RequestProcessor for DefaultRequestProcessor {
5660
Some(RequestCode::GetBrokerClusterInfo) => {
5761
self.process_get_broker_cluster_info(request)
5862
}
63+
Some(RequestCode::PutKvConfig) => self.put_kv_config(request),
5964
_ => RemotingCommand::create_response_command_with_code(
6065
RemotingSysResponseCode::SystemError,
6166
),
6267
}
6368
}
6469
}
6570

71+
///implementation put KV config
72+
impl DefaultRequestProcessor {
73+
fn put_kv_config(&mut self, request: RemotingCommand) -> RemotingCommand {
74+
let request_header = request
75+
.decode_command_custom_header::<PutKVConfigRequestHeader>()
76+
.unwrap();
77+
//check namespace and key, need?
78+
if request_header.namespace.is_empty() || request_header.key.is_empty() {
79+
return RemotingCommand::create_response_command_with_code(
80+
RemotingSysResponseCode::SystemError,
81+
)
82+
.set_remark(Some(String::from("namespace or key is empty")));
83+
}
84+
self.kvconfig_manager.write().put_kv_config(
85+
request_header.namespace.as_str(),
86+
request_header.key.as_str(),
87+
request_header.value.as_str(),
88+
);
89+
RemotingCommand::create_response_command()
90+
}
91+
}
92+
6693
#[allow(clippy::new_without_default)]
6794
impl DefaultRequestProcessor {
68-
pub fn new() -> Self {
95+
pub fn new(namesrv_config: NamesrvConfig) -> Self {
6996
Self {
7097
route_info_manager: Arc::new(parking_lot::RwLock::new(RouteInfoManager::new())),
98+
kvconfig_manager: Arc::new(parking_lot::RwLock::new(KVConfigManager::new(
99+
namesrv_config,
100+
))),
71101
}
72102
}
73103

74104
pub(crate) fn new_with_route_info_manager(
75105
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
106+
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
76107
) -> Self {
77-
Self { route_info_manager }
108+
Self {
109+
route_info_manager,
110+
kvconfig_manager,
111+
}
78112
}
79113
}
80114
impl DefaultRequestProcessor {

rocketmq-remoting/src/protocol/command_custom_header.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ pub trait CommandCustomHeader {
2323
/// Returns a `Result` indicating whether the fields are valid or not.
2424
/// If the fields are valid, the `Ok` variant is returned with an empty `()` value.
2525
/// If the fields are invalid, an `Err` variant is returned with an associated `Error` value.
26-
fn check_fields(&self) -> anyhow::Result<(), anyhow::Error>;
26+
fn check_fields(&self) -> anyhow::Result<(), anyhow::Error> {
27+
Ok(())
28+
}
2729

2830
/// Converts the implementing type to a map.
2931
///

rocketmq-remoting/src/protocol/header.rs

+1
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ pub mod broker;
1818
pub mod broker_request_header;
1919
pub mod client_request_header;
2020
pub mod kv_config_request_header;
21+
pub mod namesrv;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
pub mod kv_config_request_header;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::collections::HashMap;
19+
20+
use serde::{Deserialize, Serialize};
21+
22+
use crate::protocol::command_custom_header::{CommandCustomHeader, FromMap};
23+
24+
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
25+
pub struct PutKVConfigRequestHeader {
26+
pub namespace: String,
27+
pub key: String,
28+
pub value: String,
29+
}
30+
31+
impl PutKVConfigRequestHeader {
32+
const NAMESPACE: &'static str = "namespace";
33+
const KEY: &'static str = "key";
34+
const VALUE: &'static str = "value";
35+
36+
/// Creates a new instance of `PutKVConfigRequestHeader`.
37+
///
38+
/// # Arguments
39+
///
40+
/// * `namespace` - The namespace.
41+
/// * `key` - The key.
42+
/// * `value` - The value.
43+
pub fn new(
44+
namespace: impl Into<String>,
45+
key: impl Into<String>,
46+
value: impl Into<String>,
47+
) -> Self {
48+
Self {
49+
namespace: namespace.into(),
50+
key: key.into(),
51+
value: value.into(),
52+
}
53+
}
54+
}
55+
56+
impl CommandCustomHeader for PutKVConfigRequestHeader {
57+
fn to_map(&self) -> Option<HashMap<String, String>> {
58+
Some(HashMap::from([
59+
(
60+
PutKVConfigRequestHeader::NAMESPACE.to_string(),
61+
self.namespace.clone(),
62+
),
63+
(PutKVConfigRequestHeader::KEY.to_string(), self.key.clone()),
64+
(
65+
PutKVConfigRequestHeader::VALUE.to_string(),
66+
self.value.clone(),
67+
),
68+
]))
69+
}
70+
}
71+
72+
impl FromMap for PutKVConfigRequestHeader {
73+
type Target = PutKVConfigRequestHeader;
74+
75+
fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
76+
Some(PutKVConfigRequestHeader {
77+
namespace: map.get(PutKVConfigRequestHeader::NAMESPACE).cloned()?,
78+
key: map.get(PutKVConfigRequestHeader::KEY).cloned()?,
79+
value: map.get(PutKVConfigRequestHeader::VALUE).cloned()?,
80+
})
81+
}
82+
}

0 commit comments

Comments
 (0)