Skip to content

Commit 43304cb

Browse files
committed
[ISSUE #40]Support put kv config(request code 100)
1 parent 81b4b61 commit 43304cb

File tree

7 files changed

+148
-8
lines changed

7 files changed

+148
-8
lines changed

rocketmq-namesrv/src/bin/bootstrap_server.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,14 @@ fn init_processors(
8080
RequestCode::GetRouteinfoByTopic.to_i32(),
8181
Box::new(ClientRequestProcessor::new(
8282
route_info_manager_inner.clone(),
83-
namesrv_config,
83+
namesrv_config.clone(),
8484
kvconfig_manager_inner.clone(),
8585
)),
8686
);
87-
(processors, DefaultRequestProcessor::new())
87+
(
88+
processors,
89+
DefaultRequestProcessor::new(namesrv_config.clone()),
90+
)
8891
}
8992

9093
#[derive(Parser, Debug)]

rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use tracing::info;
2222

2323
use crate::kvconfig::KVConfigSerializeWrapper;
2424

25+
#[derive(Debug)]
2526
pub struct KVConfigManager {
2627
pub(crate) config_table:
2728
HashMap<String /* Namespace */, HashMap<String /* Key */, String /* Value */>>,

rocketmq-namesrv/src/processor/default_request_processor.rs

+38-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::sync::Arc;
1919

2020
use bytes::Bytes;
2121
use rocketmq_common::{
22-
common::{mix_all, mq_version::RocketMqVersion},
22+
common::{mix_all, mq_version::RocketMqVersion, namesrv::namesrv_config::NamesrvConfig},
2323
CRC32Utils,
2424
};
2525
use rocketmq_remoting::{
@@ -29,19 +29,23 @@ use rocketmq_remoting::{
2929
broker_body::register_broker_body::RegisterBrokerBody,
3030
topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper,
3131
},
32-
header::broker_request_header::RegisterBrokerRequestHeader,
32+
header::{
33+
broker_request_header::RegisterBrokerRequestHeader,
34+
namesrv::kv_config_request_header::PutKVConfigRequestHeader,
35+
},
3336
remoting_command::RemotingCommand,
3437
RemotingSerializable,
3538
},
3639
runtime::processor::RequestProcessor,
3740
};
3841
use tracing::warn;
3942

40-
use crate::route::route_info_manager::RouteInfoManager;
43+
use crate::{route::route_info_manager::RouteInfoManager, KVConfigManager};
4144

4245
#[derive(Debug, Clone)]
4346
pub struct DefaultRequestProcessor {
4447
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
48+
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
4549
}
4650

4751
impl RequestProcessor for DefaultRequestProcessor {
@@ -63,18 +67,47 @@ impl RequestProcessor for DefaultRequestProcessor {
6367
}
6468
}
6569

70+
///implementation put KV config
71+
impl DefaultRequestProcessor {
72+
fn put_kv_config(&mut self, request: RemotingCommand) -> RemotingCommand {
73+
let request_header = request
74+
.decode_command_custom_header::<PutKVConfigRequestHeader>()
75+
.unwrap();
76+
//check namespace and key, need?
77+
if request_header.namespace.is_empty() || request_header.key.is_empty() {
78+
return RemotingCommand::create_response_command_with_code(
79+
RemotingSysResponseCode::SystemError,
80+
)
81+
.set_remark(Some(String::from("namespace or key is empty")));
82+
}
83+
self.kvconfig_manager.write().put_kv_config(
84+
request_header.namespace.as_str(),
85+
request_header.key.as_str(),
86+
request_header.value.as_str(),
87+
);
88+
RemotingCommand::create_response_command()
89+
}
90+
}
91+
6692
#[allow(clippy::new_without_default)]
6793
impl DefaultRequestProcessor {
68-
pub fn new() -> Self {
94+
pub fn new(namesrv_config: NamesrvConfig) -> Self {
6995
Self {
7096
route_info_manager: Arc::new(parking_lot::RwLock::new(RouteInfoManager::new())),
97+
kvconfig_manager: Arc::new(parking_lot::RwLock::new(KVConfigManager::new(
98+
namesrv_config,
99+
))),
71100
}
72101
}
73102

74103
pub(crate) fn new_with_route_info_manager(
75104
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
105+
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
76106
) -> Self {
77-
Self { route_info_manager }
107+
Self {
108+
route_info_manager,
109+
kvconfig_manager,
110+
}
78111
}
79112
}
80113
impl DefaultRequestProcessor {

rocketmq-remoting/src/protocol/command_custom_header.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ pub trait CommandCustomHeader {
2323
/// Returns a `Result` indicating whether the fields are valid or not.
2424
/// If the fields are valid, the `Ok` variant is returned with an empty `()` value.
2525
/// If the fields are invalid, an `Err` variant is returned with an associated `Error` value.
26-
fn check_fields(&self) -> anyhow::Result<(), anyhow::Error>;
26+
fn check_fields(&self) -> anyhow::Result<(), anyhow::Error> {
27+
Ok(())
28+
}
2729

2830
/// Converts the implementing type to a map.
2931
///

rocketmq-remoting/src/protocol/header.rs

+1
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ pub mod broker;
1818
pub mod broker_request_header;
1919
pub mod client_request_header;
2020
pub mod kv_config_request_header;
21+
pub mod namesrv;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
pub mod kv_config_request_header;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::collections::HashMap;
19+
20+
use serde::{Deserialize, Serialize};
21+
22+
use crate::protocol::command_custom_header::{CommandCustomHeader, FromMap};
23+
24+
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
25+
pub struct PutKVConfigRequestHeader {
26+
pub namespace: String,
27+
pub key: String,
28+
pub value: String,
29+
}
30+
31+
impl PutKVConfigRequestHeader {
32+
const NAMESPACE: &'static str = "namespace";
33+
const KEY: &'static str = "key";
34+
const VALUE: &'static str = "value";
35+
36+
/// Creates a new instance of `PutKVConfigRequestHeader`.
37+
///
38+
/// # Arguments
39+
///
40+
/// * `namespace` - The namespace.
41+
/// * `key` - The key.
42+
/// * `value` - The value.
43+
pub fn new(
44+
namespace: impl Into<String>,
45+
key: impl Into<String>,
46+
value: impl Into<String>,
47+
) -> Self {
48+
Self {
49+
namespace: namespace.into(),
50+
key: key.into(),
51+
value: value.into(),
52+
}
53+
}
54+
}
55+
56+
impl CommandCustomHeader for PutKVConfigRequestHeader {
57+
fn to_map(&self) -> Option<HashMap<String, String>> {
58+
Some(HashMap::from([
59+
(
60+
PutKVConfigRequestHeader::NAMESPACE.to_string(),
61+
self.namespace.clone(),
62+
),
63+
(PutKVConfigRequestHeader::KEY.to_string(), self.key.clone()),
64+
(
65+
PutKVConfigRequestHeader::VALUE.to_string(),
66+
self.value.clone(),
67+
),
68+
]))
69+
}
70+
}
71+
72+
impl FromMap for PutKVConfigRequestHeader {
73+
type Target = PutKVConfigRequestHeader;
74+
75+
fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
76+
Some(PutKVConfigRequestHeader {
77+
namespace: map.get(PutKVConfigRequestHeader::NAMESPACE).cloned()?,
78+
key: map.get(PutKVConfigRequestHeader::KEY).cloned()?,
79+
value: map.get(PutKVConfigRequestHeader::VALUE).cloned()?,
80+
})
81+
}
82+
}

0 commit comments

Comments
 (0)