Skip to content

Commit 0e0c5d9

Browse files
oteffahiDenisBiryukov91OlivierHecart
authored
[ZEN-530] Add link protocols subject to interceptors (#1850)
* Rework AuthId types * Add InterceptorLink config enum * Add link protocol subject to ACL * Add ACL link_protocols tests * Remove LinkAuthId::None variant * Add link_protocols subject to Downsampling * Update DEFAULT_CONFIG * Address clippy warning * Fix typo in enum variant * Address clippy warning * Remove outdated doc * Use NEVec to avoid boilerplate config-checking code * Support link-protocols in qos-overwrite * clippy fix * update DEFAULT_CONFIG.json * Homogenize DEFAULT_CONFIG.json5 doc comments * Update tracing * Log line number when error parsing json config --------- Co-authored-by: Denis Biryukov <[email protected]> Co-authored-by: OlivierHecart <[email protected]>
1 parent be7372d commit 0e0c5d9

File tree

34 files changed

+619
-367
lines changed

34 files changed

+619
-367
lines changed

Cargo.lock

Lines changed: 13 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ libloading = "0.8"
121121
tracing = "0.1"
122122
lz4_flex = "0.11"
123123
nix = { version = "0.29.0", features = ["fs"] }
124+
nonempty-collections = {version = "0.3.0", features = ["serde"]}
124125
num_cpus = "1.16.0"
125126
num-traits = { version = "0.2.19", default-features = false }
126127
once_cell = "1.19.0"

DEFAULT_CONFIG.json5

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,9 @@
261261
// "lo0",
262262
// "en0",
263263
// ],
264+
// /// Optional list of link protocols. Transports with at least one of these links will have their qos overwritten.
265+
// /// If absent, the overwrite will be applied to all transports. An empty list is invalid.
266+
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
264267
// /// List of message types to apply to.
265268
// messages: [
266269
// "put", // put publications
@@ -312,8 +315,11 @@
312315
// /// Optional Id, has to be unique
313316
// "id": "wlan0egress",
314317
// /// Optional list of network interfaces messages will be processed on, the rest will be passed as is.
315-
// /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any.
318+
// /// If absent, the rules will be applied to all interfaces. An empty list is invalid.
316319
// interfaces: [ "wlan0" ],
320+
// /// Optional list of link protocols. Transports with at least one of these links will have their messages filtered.
321+
// /// If absent, the rules will be applied to all transports. An empty list is invalid.
322+
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
317323
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
318324
// /// If absent, the rules will be applied to both flows.
319325
// flow: ["ingress", "egress"],
@@ -412,6 +418,12 @@
412418
// "id": "subject3",
413419
// /// An empty subject combination is a wildcard
414420
// },
421+
// {
422+
// "id": "subject4",
423+
// /// link protocols can also be used to identify transports to filter messages on.
424+
// /// If absent, the rules will be applied to all transports. An empty list is invalid.
425+
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
426+
// },
415427
// ],
416428
// /// The policies list associates rules to subjects
417429
// "policies":
@@ -426,7 +438,7 @@
426438
// },
427439
// {
428440
// "rules": ["rule2"],
429-
// "subjects": ["subject3"],
441+
// "subjects": ["subject3", "subject4"],
430442
// },
431443
// ]
432444
//},

commons/zenoh-config/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ serde_json = { workspace = true }
3737
serde_with = { workspace = true }
3838
serde_yaml = { workspace = true }
3939
validated_struct = { workspace = true, features = ["json5", "json_get"] }
40+
nonempty-collections = {workspace = true }
4041
zenoh-core = { workspace = true }
4142
zenoh-keyexpr = { workspace = true }
4243
zenoh-protocol = { workspace = true }

commons/zenoh-config/src/lib.rs

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ use std::{
3232
};
3333

3434
use include::recursive_include;
35-
use qos::{PublisherQoSConfList, QosOverwriteItemConf};
35+
use nonempty_collections::NEVec;
36+
use qos::{PublisherQoSConfList, QosOverwriteMessage, QosOverwrites};
3637
use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize};
3738
use serde::{Deserialize, Serialize};
3839
use serde_json::{Map, Value};
@@ -111,30 +112,54 @@ pub struct DownsamplingItemConf {
111112
pub id: Option<String>,
112113
/// A list of interfaces to which the downsampling will be applied
113114
/// Downsampling will be applied for all interfaces if the parameter is None
114-
pub interfaces: Option<Vec<String>>,
115+
pub interfaces: Option<NEVec<String>>,
116+
/// A list of link types, transports having one of those link types will have the downsampling applied
117+
/// Downsampling will be applied for all link types if the parameter is None
118+
pub link_protocols: Option<NEVec<InterceptorLink>>,
115119
// list of message types on which the downsampling will be applied
116-
pub messages: Vec<DownsamplingMessage>,
117-
/// A list of interfaces to which the downsampling will be applied.
118-
pub rules: Vec<DownsamplingRuleConf>,
120+
pub messages: NEVec<DownsamplingMessage>,
121+
/// A list of downsampling rules: key_expression and the maximum frequency in Hertz
122+
pub rules: NEVec<DownsamplingRuleConf>,
119123
/// Downsampling flow directions: egress and/or ingress
120-
pub flows: Option<Vec<InterceptorFlow>>,
124+
pub flows: Option<NEVec<InterceptorFlow>>,
121125
}
122126

123127
#[derive(Serialize, Debug, Deserialize, Clone)]
124128
pub struct AclConfigRule {
125129
pub id: String,
126-
pub key_exprs: Vec<String>,
127-
pub messages: Vec<AclMessage>,
128-
pub flows: Option<Vec<InterceptorFlow>>,
130+
pub key_exprs: NEVec<OwnedKeyExpr>,
131+
pub messages: NEVec<AclMessage>,
132+
pub flows: Option<NEVec<InterceptorFlow>>,
129133
pub permission: Permission,
130134
}
131135

132136
#[derive(Serialize, Debug, Deserialize, Clone)]
133137
pub struct AclConfigSubjects {
134138
pub id: String,
135-
pub interfaces: Option<Vec<Interface>>,
136-
pub cert_common_names: Option<Vec<CertCommonName>>,
137-
pub usernames: Option<Vec<Username>>,
139+
pub interfaces: Option<NEVec<Interface>>,
140+
pub cert_common_names: Option<NEVec<CertCommonName>>,
141+
pub usernames: Option<NEVec<Username>>,
142+
pub link_protocols: Option<NEVec<InterceptorLink>>,
143+
}
144+
145+
#[derive(Debug, Deserialize, Serialize, Clone)]
146+
pub struct QosOverwriteItemConf {
147+
/// Optional identifier for the qos modification configuration item.
148+
pub id: Option<String>,
149+
/// A list of interfaces to which the qos will be applied.
150+
/// QosOverwrite will be applied for all interfaces if the parameter is None.
151+
pub interfaces: Option<NEVec<String>>,
152+
/// A list of link types, transports having one of those link types will have the qos overwrite applied
153+
/// Qos overwrite will be applied for all link types if the parameter is None.
154+
pub link_protocols: Option<NEVec<InterceptorLink>>,
155+
/// List of message types on which the qos overwrite will be applied.
156+
pub messages: NEVec<QosOverwriteMessage>,
157+
/// List of key expressions to apply qos overwrite.
158+
pub key_exprs: Vec<OwnedKeyExpr>,
159+
// The qos value to overwrite with.
160+
pub overwrite: QosOverwrites,
161+
/// QosOverwrite flow directions: egress and/or ingress.
162+
pub flows: Option<NEVec<InterceptorFlow>>,
138163
}
139164

140165
#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
@@ -164,6 +189,26 @@ impl std::fmt::Display for Username {
164189
}
165190
}
166191

192+
#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
193+
#[serde(rename_all = "kebab-case")]
194+
pub enum InterceptorLink {
195+
Tcp,
196+
Udp,
197+
Tls,
198+
Quic,
199+
Serial,
200+
Unixpipe,
201+
UnixsockStream,
202+
Vsock,
203+
Ws,
204+
}
205+
206+
impl std::fmt::Display for InterceptorLink {
207+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208+
write!(f, "Transport({:?})", self)
209+
}
210+
}
211+
167212
#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
168213
pub struct AclConfigPolicyEntry {
169214
pub id: Option<String>,
@@ -174,7 +219,7 @@ pub struct AclConfigPolicyEntry {
174219
#[derive(Clone, Serialize, Debug, Deserialize)]
175220
pub struct PolicyRule {
176221
pub subject_id: usize,
177-
pub key_expr: String,
222+
pub key_expr: OwnedKeyExpr,
178223
pub message: AclMessage,
179224
pub permission: Permission,
180225
pub flow: InterceptorFlow,
@@ -908,13 +953,13 @@ impl Config {
908953
Some("json") | Some("json5") => match json5::Deserializer::from_str(&content) {
909954
Ok(mut d) => Config::from_deserializer(&mut d).map_err(|e| match e {
910955
Ok(c) => zerror!("Invalid configuration: {}", c).into(),
911-
Err(e) => zerror!("JSON error: {}", e).into(),
956+
Err(e) => zerror!("JSON error: {:?}", e).into(),
912957
}),
913958
Err(e) => bail!(e),
914959
},
915960
Some("yaml") | Some("yml") => Config::from_deserializer(serde_yaml::Deserializer::from_str(&content)).map_err(|e| match e {
916961
Ok(c) => zerror!("Invalid configuration: {}", c).into(),
917-
Err(e) => zerror!("YAML error: {}", e).into(),
962+
Err(e) => zerror!("YAML error: {:?}", e).into(),
918963
}),
919964
Some(other) => bail!("Unsupported file type '.{}' (.json, .json5 and .yaml are supported)", other),
920965
None => bail!("Unsupported file type. Configuration files must have an extension (.json, .json5 and .yaml supported)")

commons/zenoh-config/src/qos.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ use serde::{Deserialize, Serialize};
1515
use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree};
1616
use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Priority, Reliability};
1717

18-
use crate::InterceptorFlow;
19-
2018
#[derive(Debug, Deserialize, Default, Serialize, Clone)]
2119
pub struct PublisherQoSConfList(pub(crate) Vec<PublisherQoSConf>);
2220

@@ -134,23 +132,6 @@ pub enum PublisherLocalityConf {
134132
Any,
135133
}
136134

137-
#[derive(Default, Debug, Deserialize, Serialize, Clone)]
138-
pub struct QosOverwriteItemConf {
139-
/// Optional identifier for the qos modification configuration item.
140-
pub id: Option<String>,
141-
/// A list of interfaces to which the qos will be applied.
142-
/// QosOverwrite will be applied for all interfaces if the parameter is None.
143-
pub interfaces: Option<Vec<String>>,
144-
/// List of message types on which the qos overwrite will be applied.
145-
pub messages: Vec<QosOverwriteMessage>,
146-
/// List of key expressions to apply qos overwrite.
147-
pub key_exprs: Vec<OwnedKeyExpr>,
148-
// The qos value to overwrite with.
149-
pub overwrite: QosOverwrites,
150-
/// QosOverwrite flow directions: egress and/or ingress.
151-
pub flows: Option<Vec<InterceptorFlow>>,
152-
}
153-
154135
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
155136
#[serde(rename_all = "snake_case")]
156137
pub enum QosOverwriteMessage {

io/zenoh-link-commons/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl Link {
105105
mtu: link.get_mtu(),
106106
is_streamed: false,
107107
interfaces: vec![],
108-
auth_identifier: LinkAuthId::default(),
108+
auth_identifier: link.get_auth_id().clone(),
109109
priorities: None,
110110
reliability: None,
111111
}

io/zenoh-link-commons/src/multicast.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use zenoh_protocol::{
2727
};
2828
use zenoh_result::{zerror, ZResult};
2929

30+
use crate::LinkAuthId;
31+
3032
/*************************************/
3133
/* MANAGER */
3234
/*************************************/
@@ -48,6 +50,7 @@ pub trait LinkMulticastTrait: Send + Sync {
4850
fn get_mtu(&self) -> BatchSize;
4951
fn get_src(&self) -> &Locator;
5052
fn get_dst(&self) -> &Locator;
53+
fn get_auth_id(&self) -> &LinkAuthId;
5154
fn is_reliable(&self) -> bool;
5255
async fn write(&self, buffer: &[u8]) -> ZResult<usize>;
5356
async fn write_all(&self, buffer: &[u8]) -> ZResult<()>;

io/zenoh-link-commons/src/unicast.rs

Lines changed: 10 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -123,74 +123,14 @@ pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec<String> {
123123
}
124124

125125
#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)]
126-
pub enum LinkAuthType {
127-
Tls,
128-
Quic,
129-
None,
130-
}
131-
132-
#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)]
133-
pub struct LinkAuthId {
134-
auth_type: LinkAuthType,
135-
auth_value: Option<String>,
136-
}
137-
138-
impl LinkAuthId {
139-
pub const NONE: Self = Self {
140-
auth_type: LinkAuthType::None,
141-
auth_value: None,
142-
};
143-
pub fn get_type(&self) -> &LinkAuthType {
144-
&self.auth_type
145-
}
146-
pub fn get_value(&self) -> &Option<String> {
147-
&self.auth_value
148-
}
149-
pub fn builder() -> LinkAuthIdBuilder {
150-
LinkAuthIdBuilder::new()
151-
}
152-
}
153-
154-
impl Default for LinkAuthId {
155-
fn default() -> Self {
156-
LinkAuthId::NONE.clone()
157-
}
158-
}
159-
160-
#[derive(Debug)]
161-
pub struct LinkAuthIdBuilder {
162-
pub auth_type: LinkAuthType, // HAS to be provided when building
163-
pub auth_value: Option<String>, // actual value added to the above type; is None for None type
164-
}
165-
166-
impl Default for LinkAuthIdBuilder {
167-
fn default() -> Self {
168-
Self::new()
169-
}
170-
}
171-
172-
impl LinkAuthIdBuilder {
173-
pub fn new() -> LinkAuthIdBuilder {
174-
LinkAuthIdBuilder {
175-
auth_type: LinkAuthType::None,
176-
auth_value: None,
177-
}
178-
}
179-
180-
pub fn auth_type(mut self, auth_type: LinkAuthType) -> Self {
181-
self.auth_type = auth_type;
182-
self
183-
}
184-
185-
pub fn auth_value(mut self, auth_value: Option<String>) -> Self {
186-
self.auth_value = auth_value;
187-
self
188-
}
189-
190-
pub fn build(self) -> LinkAuthId {
191-
LinkAuthId {
192-
auth_type: self.auth_type.clone(),
193-
auth_value: self.auth_value.clone(),
194-
}
195-
}
126+
pub enum LinkAuthId {
127+
Tls(Option<String>),
128+
Quic(Option<String>),
129+
Tcp,
130+
Udp,
131+
Serial,
132+
Unixpipe,
133+
UnixsockStream,
134+
Vsock,
135+
Ws,
196136
}

0 commit comments

Comments
 (0)