Skip to content

Commit 6a0591c

Browse files
authored
feat: improve message processing file (#101)
Signed-off-by: Michele Papalini <[email protected]>
1 parent 7bd8319 commit 6a0591c

File tree

6 files changed

+202
-315
lines changed

6 files changed

+202
-315
lines changed

data-plane/gateway/datapath/src/forwarder.rs

Lines changed: 38 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,15 @@ where
8585
agent_id: Option<u64>,
8686
conn_index: u64,
8787
is_local: bool,
88+
add: bool,
8889
) -> Result<(), SubscriptionTableError> {
89-
self.subscription_table
90-
.add_subscription(agent_type, agent_id, conn_index, is_local)
90+
if add {
91+
self.subscription_table
92+
.add_subscription(agent_type, agent_id, conn_index, is_local)
93+
} else {
94+
self.subscription_table
95+
.remove_subscription(agent_type, agent_id, conn_index, is_local)
96+
}
9197
}
9298

9399
pub fn on_forwarded_subscription(
@@ -97,56 +103,38 @@ where
97103
name_type: AgentType,
98104
name_agent_id: Option<u64>,
99105
conn_index: u64,
106+
add: bool,
100107
) {
101108
let source = Agent::new(source_type, source_agent_id.unwrap_or(DEFAULT_AGENT_ID));
102109
let name = Agent::new(name_type, name_agent_id.unwrap_or(DEFAULT_AGENT_ID));
103-
self.remote_subscription_table
104-
.add_subscription(source, name, conn_index);
105-
}
106-
107-
pub fn on_unsubscription_msg(
108-
&self,
109-
agent_type: AgentType,
110-
agent_id: Option<u64>,
111-
conn_index: u64,
112-
is_local: bool,
113-
) -> Result<(), SubscriptionTableError> {
114-
self.subscription_table
115-
.remove_subscription(agent_type, agent_id, conn_index, is_local)
116-
}
117-
118-
pub fn on_forwarded_unsubscription(
119-
&self,
120-
source_type: AgentType,
121-
source_agent_id: Option<u64>,
122-
name_type: AgentType,
123-
name_agent_id: Option<u64>,
124-
conn_index: u64,
125-
) {
126-
let source = Agent::new(source_type, source_agent_id.unwrap_or(DEFAULT_AGENT_ID));
127-
let name = Agent::new(name_type, name_agent_id.unwrap_or(DEFAULT_AGENT_ID));
128-
self.remote_subscription_table
129-
.remove_subscription(source, name, conn_index);
130-
}
131-
132-
pub fn on_publish_msg_match_one(
133-
&self,
134-
agent_type: AgentType,
135-
agent_id: Option<u64>,
136-
incoming_conn: u64,
137-
) -> Result<u64, SubscriptionTableError> {
138-
self.subscription_table
139-
.match_one(agent_type, agent_id, incoming_conn)
110+
if add {
111+
self.remote_subscription_table
112+
.add_subscription(source, name, conn_index);
113+
} else {
114+
self.remote_subscription_table
115+
.remove_subscription(source, name, conn_index);
116+
}
140117
}
141118

142-
pub fn on_publish_msg_match_all(
119+
pub fn on_publish_msg_match(
143120
&self,
144121
agent_type: AgentType,
145122
agent_id: Option<u64>,
146123
incoming_conn: u64,
124+
fanout: u32,
147125
) -> Result<Vec<u64>, SubscriptionTableError> {
148-
self.subscription_table
149-
.match_all(agent_type, agent_id, incoming_conn)
126+
if fanout == 1 {
127+
match self
128+
.subscription_table
129+
.match_one(agent_type, agent_id, incoming_conn)
130+
{
131+
Ok(out) => Ok(vec![out]),
132+
Err(e) => Err(e),
133+
}
134+
} else {
135+
self.subscription_table
136+
.match_all(agent_type, agent_id, incoming_conn)
137+
}
150138
}
151139

152140
#[allow(dead_code)]
@@ -169,33 +157,33 @@ mod tests {
169157
let fwd = Forwarder::<u32>::new();
170158

171159
assert_eq!(
172-
fwd.on_subscription_msg(agent_class.clone(), None, 10, false),
160+
fwd.on_subscription_msg(agent_class.clone(), None, 10, false, true),
173161
Ok(())
174162
);
175163
assert_eq!(
176-
fwd.on_subscription_msg(agent_class.clone(), Some(1), 12, false),
164+
fwd.on_subscription_msg(agent_class.clone(), Some(1), 12, false, true),
177165
Ok(())
178166
);
179167
assert_eq!(
180168
// this creates a warning
181-
fwd.on_subscription_msg(agent_class.clone(), Some(1), 12, false),
169+
fwd.on_subscription_msg(agent_class.clone(), Some(1), 12, false, true),
182170
Ok(())
183171
);
184172
assert_eq!(
185-
fwd.on_publish_msg_match_one(agent_class.clone(), Some(1), 100),
186-
Ok(12)
173+
fwd.on_publish_msg_match(agent_class.clone(), Some(1), 100, 1),
174+
Ok(vec![12])
187175
);
188176
assert_eq!(
189-
fwd.on_publish_msg_match_one(agent_class.clone(), Some(2), 100),
177+
fwd.on_publish_msg_match(agent_class.clone(), Some(2), 100, 1),
190178
Err(SubscriptionTableError::NoMatch)
191179
);
192180

193181
assert_eq!(
194-
fwd.on_unsubscription_msg(agent_class.clone(), None, 10, false),
182+
fwd.on_subscription_msg(agent_class.clone(), None, 10, false, false),
195183
Ok(())
196184
);
197185
assert_eq!(
198-
fwd.on_unsubscription_msg(agent_class.clone(), None, 10, false),
186+
fwd.on_subscription_msg(agent_class.clone(), None, 10, false, false),
199187
Err(SubscriptionTableError::AgentIdNotFound)
200188
);
201189
}

0 commit comments

Comments
 (0)