Skip to content

Commit aefaaa0

Browse files
authored
feat: new message format (#88)
Signed-off-by: Michele Papalini <[email protected]>
1 parent 86b19f2 commit aefaaa0

File tree

19 files changed

+1171
-819
lines changed

19 files changed

+1171
-819
lines changed

data-plane/examples/src/sdk-mock/main.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use clap::Parser;
55
use tracing::info;
66

7-
use agp_datapath::messages::encoder::{encode_agent_class, encode_agent_from_string};
7+
use agp_datapath::messages::encoder::{encode_agent, encode_agent_type};
88
use agp_gw::config;
99

1010
mod args;
@@ -38,20 +38,20 @@ async fn main() {
3838

3939
// create local agent
4040
let agent_id = 0;
41-
let agent_name = encode_agent_from_string("cisco", "default", local_agent, agent_id);
41+
let agent_name = encode_agent("cisco", "default", local_agent, agent_id);
4242
let mut rx = svc.create_agent(agent_name.clone());
4343

4444
// connect to the remote gateway
4545
let conn_id = svc.connect(None).await.unwrap();
4646
info!("remote connection id = {}", conn_id);
4747

48-
let local_agent_class = encode_agent_class("cisco", "default", local_agent);
49-
svc.subscribe(&local_agent_class, Some(agent_id), conn_id)
48+
let local_agent_type = encode_agent_type("cisco", "default", local_agent);
49+
svc.subscribe(&local_agent_type, Some(agent_id), conn_id)
5050
.await
5151
.unwrap();
5252

5353
// Set a route for the remote agent
54-
let route = encode_agent_class("cisco", "default", remote_agent);
54+
let route = encode_agent_type("cisco", "default", remote_agent);
5555
info!("allowing messages to remote agent: {:?}", route);
5656
svc.set_route(&route, None, conn_id).await.unwrap();
5757

data-plane/gateway/datapath/pubsub/proto/v1/pubsub.proto

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,39 +9,78 @@ service PubSubService {
99
}
1010

1111
message Subscribe {
12-
AgentId source = 1;
13-
AgentId name = 2;
12+
AGPHeader header = 1;
1413
}
1514

1615
message Unsubscribe {
17-
AgentId source = 1;
18-
AgentId name = 2;
16+
AGPHeader header = 1;
1917
}
2018

2119
message Publish {
22-
AgentId source = 1;
23-
AgentId name = 2;
20+
AGPHeader header = 1;
21+
ServiceHeader control = 2;
2422
uint32 fanout = 3;
2523
Content msg = 4;
2624
}
2725

28-
message AgentGroup {
26+
// recvFrom = connection form where the sub/unsub is supposed to be received
27+
// forwardTo = connection where to forward the message
28+
// incomingConn = connection from where the packet was received
29+
// error = if true the publication contains an error notification
30+
message AGPHeader {
31+
Agent source = 1;
32+
Agent destination = 2;
33+
optional uint64 recvFrom = 3;
34+
optional uint64 forwardTo = 4;
35+
optional uint64 incomingConn = 5;
36+
optional bool error = 6;
37+
}
38+
39+
message Agent {
2940
uint64 organization = 1;
3041
uint64 namespace = 2;
42+
uint64 agent_type = 3;
43+
optional uint64 agent_id = 4;
3144
}
3245

33-
message AgentClass {
34-
AgentGroup group = 1;
35-
uint64 class = 2;
46+
enum ServiceHeaderType {
47+
CTRL_UNSPECIFIED = 0;
48+
CTRL_FNF = 1;
49+
CTRL_REQUEST = 2;
50+
CTRL_REPLY = 3;
51+
CTRL_STREAM = 4;
52+
CTRL_RTX_REQUEST = 5;
53+
CTRL_RTX_REPLY = 6;
3654
}
3755

38-
message AgentId {
39-
AgentClass class = 1;
40-
optional uint64 id = 2;
56+
// Service.id meaning according to ControlType
57+
// CTRL_FNF = nonce
58+
// CTRL_REQUEST = nonce
59+
// CTRL_REPLY = nonce of the associated CTRL_REQUEST
60+
// CTRL_STREAM = sequential incremental ID
61+
// CTRL_RTX_REQUEST = nonce
62+
// CTRL_RTX_REPLY = nonce of the associated CTRL_RTX_REQUEST
63+
64+
// Service.stream meaning according to ControlType
65+
// CTRL_STREAM = stream for which the id field is valid
66+
// CTRL_RTX_REQUEST = stream for this CTRL_RTX_REQUEST
67+
// CTRL_RTX_REPLY = stream for this CTRL_RTX_REPLY
68+
// None in all the other cases
69+
70+
// Service.rtx meaning according to ControlType
71+
// CTRL_RTX_REQUEST = id of the messing packet to retransmit
72+
// CTRL_RTX_REPLY = id of the retransmitted packet
73+
// None in all the other cases
74+
75+
message ServiceHeader {
76+
ServiceHeaderType header_type = 1;
77+
uint32 id = 2;
78+
optional uint32 stream = 3;
79+
optional uint32 rtx = 4;
4180
}
4281

4382
message Content {
44-
string contentType = 1;
83+
string content_type = 1;
4584
bytes blob = 2;
4685
}
4786

data-plane/gateway/datapath/src/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ pub enum DataPathError {
1111
DisconnectionError(String),
1212
#[error("unkwon message type {0}")]
1313
UnknownMsgType(String),
14+
#[error("unable to set incoming connection")]
15+
ErrorSettingInConnection(String),
1416
#[error("error handling subscription: {0}")]
1517
SubscriptionError(String),
1618
#[error("error handling unsubscription: {0}")]

data-plane/gateway/datapath/src/forwarder.rs

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use super::tables::remote_subscription_table::RemoteSubscriptions;
99
use super::tables::subscription_table::SubscriptionTableImpl;
1010
use super::tables::{errors::SubscriptionTableError, SubscriptionTable};
1111
use crate::messages::encoder::DEFAULT_AGENT_ID;
12-
use crate::messages::{Agent, AgentClass};
12+
use crate::messages::{Agent, AgentType};
1313
use crate::tables::remote_subscription_table::SubscriptionInfo;
1414

1515
#[derive(Debug)]
@@ -81,84 +81,72 @@ where
8181

8282
pub fn on_subscription_msg(
8383
&self,
84-
class: AgentClass,
84+
agent_type: AgentType,
8585
agent_id: Option<u64>,
8686
conn_index: u64,
8787
is_local: bool,
8888
) -> Result<(), SubscriptionTableError> {
8989
self.subscription_table
90-
.add_subscription(class, agent_id, conn_index, is_local)
90+
.add_subscription(agent_type, agent_id, conn_index, is_local)
9191
}
9292

9393
pub fn on_forwarded_subscription(
9494
&self,
95-
source_class: AgentClass,
95+
source_type: AgentType,
9696
source_agent_id: Option<u64>,
97-
name_class: AgentClass,
97+
name_type: AgentType,
9898
name_agent_id: Option<u64>,
9999
conn_index: u64,
100100
) {
101-
let source = Agent {
102-
agent_class: source_class,
103-
agent_id: source_agent_id.unwrap_or(DEFAULT_AGENT_ID),
104-
};
105-
let name = Agent {
106-
agent_class: name_class,
107-
agent_id: name_agent_id.unwrap_or(DEFAULT_AGENT_ID),
108-
};
101+
let source = Agent::new(source_type, source_agent_id.unwrap_or(DEFAULT_AGENT_ID));
102+
let name = Agent::new(name_type, name_agent_id.unwrap_or(DEFAULT_AGENT_ID));
109103
self.remote_subscription_table
110104
.add_subscription(source, name, conn_index);
111105
}
112106

113107
pub fn on_unsubscription_msg(
114108
&self,
115-
class: AgentClass,
109+
agent_type: AgentType,
116110
agent_id: Option<u64>,
117111
conn_index: u64,
118112
is_local: bool,
119113
) -> Result<(), SubscriptionTableError> {
120114
self.subscription_table
121-
.remove_subscription(class, agent_id, conn_index, is_local)
115+
.remove_subscription(agent_type, agent_id, conn_index, is_local)
122116
}
123117

124118
pub fn on_forwarded_unsubscription(
125119
&self,
126-
source_class: AgentClass,
120+
source_type: AgentType,
127121
source_agent_id: Option<u64>,
128-
name_class: AgentClass,
122+
name_type: AgentType,
129123
name_agent_id: Option<u64>,
130124
conn_index: u64,
131125
) {
132-
let source = Agent {
133-
agent_class: source_class,
134-
agent_id: source_agent_id.unwrap_or(DEFAULT_AGENT_ID),
135-
};
136-
let name = Agent {
137-
agent_class: name_class,
138-
agent_id: name_agent_id.unwrap_or(DEFAULT_AGENT_ID),
139-
};
126+
let source = Agent::new(source_type, source_agent_id.unwrap_or(DEFAULT_AGENT_ID));
127+
let name = Agent::new(name_type, name_agent_id.unwrap_or(DEFAULT_AGENT_ID));
140128
self.remote_subscription_table
141129
.remove_subscription(source, name, conn_index);
142130
}
143131

144132
pub fn on_publish_msg_match_one(
145133
&self,
146-
class: AgentClass,
134+
agent_type: AgentType,
147135
agent_id: Option<u64>,
148136
incoming_conn: u64,
149137
) -> Result<u64, SubscriptionTableError> {
150138
self.subscription_table
151-
.match_one(class, agent_id, incoming_conn)
139+
.match_one(agent_type, agent_id, incoming_conn)
152140
}
153141

154142
pub fn on_publish_msg_match_all(
155143
&self,
156-
class: AgentClass,
144+
agent_type: AgentType,
157145
agent_id: Option<u64>,
158146
incoming_conn: u64,
159147
) -> Result<Vec<u64>, SubscriptionTableError> {
160148
self.subscription_table
161-
.match_all(class, agent_id, incoming_conn)
149+
.match_all(agent_type, agent_id, incoming_conn)
162150
}
163151

164152
#[allow(dead_code)]
@@ -170,13 +158,13 @@ where
170158
#[cfg(test)]
171159
mod tests {
172160
use super::*;
173-
use crate::messages::encoder::encode_agent_class;
161+
use crate::messages::encoder::encode_agent_type;
174162
use tracing_test::traced_test;
175163

176164
#[test]
177165
#[traced_test]
178166
fn test_forwarder() {
179-
let agent_class = encode_agent_class("Cisco", "Default", "class_ONE");
167+
let agent_class = encode_agent_type("Cisco", "Default", "class_ONE");
180168

181169
let fwd = Forwarder::<u32>::new();
182170

0 commit comments

Comments
 (0)