Skip to content

Commit 59b6dea

Browse files
micpapalmsardara
andauthored
feat: streaming test app (#144)
Signed-off-by: Mauro Sardara <[email protected]> Signed-off-by: Michele Papalini <[email protected]> Co-authored-by: Mauro Sardara <[email protected]>
1 parent c4f75e7 commit 59b6dea

File tree

3 files changed

+262
-78
lines changed

3 files changed

+262
-78
lines changed

data-plane/gateway/service/src/streaming.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ impl Streaming {
242242
&source,
243243
pkt_src.agent_type(),
244244
Some(pkt_src.agent_id()),
245-
Some(AgpHeaderFlags::default().with_incoming_conn(incoming_conn)),
245+
Some(AgpHeaderFlags::default().with_forward_to(incoming_conn)),
246246
));
247247

248248
let session_header = Some(SessionHeader::new(
@@ -340,14 +340,14 @@ impl Streaming {
340340
}
341341
}
342342
for r in rtx {
343-
debug!("send a rtx for message {} from receiver session {}", r, session_id);
343+
debug!("packet loss detected on session {}, send RTX for id {}", session_id, r);
344344
let dest = producer_name.as_ref().unwrap(); // this cannot panic a this point
345345

346346
let agp_header = Some(AgpHeader::new(
347347
&source,
348348
dest.agent_type(),
349349
Some(dest.agent_id()),
350-
Some(AgpHeaderFlags::default().with_incoming_conn(producer_conn.unwrap())),
350+
Some(AgpHeaderFlags::default().with_forward_to(producer_conn.unwrap())),
351351
));
352352

353353
let session_header = Some(SessionHeader::new(
@@ -478,7 +478,6 @@ impl Session for Streaming {
478478
message: SessionMessage,
479479
direction: MessageDirection,
480480
) -> Result<(), SessionError> {
481-
debug!("receive message");
482481
self.tx
483482
.send(Ok((message.message, direction)))
484483
.await
@@ -801,14 +800,15 @@ mod tests {
801800
tx_app_receiver,
802801
);
803802

804-
let message = Message::new_publish(
803+
let mut message = Message::new_publish(
805804
&Agent::from_strings("cisco", "default", "sender", 0),
806805
&AgentType::from_strings("cisco", "default", "receiver"),
807806
Some(0),
808807
Some(AgpHeaderFlags::default().with_incoming_conn(0)),
809808
"msg",
810809
vec![0x1, 0x2, 0x3, 0x4],
811810
);
811+
message.set_incoming_conn(Some(0));
812812

813813
let session_msg: SessionMessage = SessionMessage::new(message.clone(), Info::new(0));
814814
// send 3 messages from the producer app
@@ -823,14 +823,16 @@ mod tests {
823823
// read the 3 messages from the sender gw channel
824824
// forward message 1 and 3 to the receiver
825825
for i in 0..3 {
826-
let msg = rx_gw_sender.recv().await.unwrap().unwrap();
826+
let mut msg = rx_gw_sender.recv().await.unwrap().unwrap();
827827
let msg_header = msg.get_session_header();
828828
assert_eq!(msg_header.session_id, 0);
829829
assert_eq!(msg_header.message_id, i);
830830
assert_eq!(msg_header.header_type, SessionHeaderType::Stream.into());
831831

832832
// the receiver should detect a loss for packet 1
833833
if i != 1 {
834+
// make sure to set the incoming connection to avoid paninc
835+
msg.set_incoming_conn(Some(0));
834836
let session_msg: SessionMessage = SessionMessage::new(msg.clone(), Info::new(0));
835837
let res = receiver
836838
.on_message(session_msg.clone(), MessageDirection::North)
@@ -893,7 +895,9 @@ mod tests {
893895
);
894896

895897
// send the second reply to the producer
896-
let session_msg: SessionMessage = SessionMessage::new(msg.clone(), Info::new(0));
898+
let mut session_msg: SessionMessage = SessionMessage::new(msg.clone(), Info::new(0));
899+
// make sure to set the incoming connection to avoid paninc
900+
session_msg.message.set_incoming_conn(Some(0));
897901
let res = sender
898902
.on_message(session_msg.clone(), MessageDirection::North)
899903
.await;
@@ -917,7 +921,9 @@ mod tests {
917921
)
918922
);
919923

920-
let session_msg: SessionMessage = SessionMessage::new(msg.clone(), Info::new(0));
924+
let mut session_msg: SessionMessage = SessionMessage::new(msg.clone(), Info::new(0));
925+
// make sure to set the incoming connection to avoid paninc
926+
session_msg.message.set_incoming_conn(Some(0));
921927
let res = receiver
922928
.on_message(session_msg.clone(), MessageDirection::North)
923929
.await;

data-plane/testing/src/bin/publisher.rs

Lines changed: 158 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ use std::fs::File;
55
use std::io::prelude::*;
66
use std::{collections::HashMap, sync::Arc};
77

8-
use agp_datapath::messages::Agent;
8+
use agp_datapath::messages::{Agent, AgentType};
9+
use agp_service::AgpHeaderFlags;
910
use parking_lot::RwLock;
1011
use testing::parse_line;
1112
use tokio_util::sync::CancellationToken;
1213

14+
use agp_service::streaming::StreamingConfiguration;
15+
1316
use agp_gw::config;
1417
use clap::Parser;
1518
use indicatif::ProgressBar;
@@ -18,9 +21,19 @@ use tracing::{debug, error, info};
1821
#[derive(Parser, Debug)]
1922
#[command(version, about, long_about = None)]
2023
pub struct Args {
21-
/// Workload input file
22-
#[arg(short, long, value_name = "WORKLOAD", required = true)]
23-
workload: String,
24+
/// Workload input file, required if used in workload mode. If this is set the streaming mode is set to false.
25+
#[arg(short, long, value_name = "WORKLOAD", required = false)]
26+
workload: Option<String>,
27+
28+
/// Runs in streaming mode.
29+
#[arg(
30+
short,
31+
long,
32+
value_name = "STREAMING",
33+
required = false,
34+
default_value_t = false
35+
)]
36+
streaming: bool,
2437

2538
/// Agp config file
2639
#[arg(short, long, value_name = "CONFIGURATION", required = true)]
@@ -54,22 +67,30 @@ pub struct Args {
5467
#[arg(
5568
short,
5669
long,
57-
value_name = "SLEEP",
70+
value_name = "FREQUENCY",
5871
required = false,
5972
default_value_t = 0
6073
)]
61-
sleep: u32,
74+
frequency: u32,
75+
76+
/// used only in streaming mode, defines the maximum number of packets to send
77+
#[arg(short, long, value_name = "PACKETS", required = false)]
78+
max_packets: Option<u64>,
6279
}
6380

6481
impl Args {
6582
pub fn msg_size(&self) -> &u32 {
6683
&self.msg_size
6784
}
6885

69-
pub fn workload(&self) -> &String {
86+
pub fn workload(&self) -> &Option<String> {
7087
&self.workload
7188
}
7289

90+
pub fn streaming(&self) -> &bool {
91+
&self.streaming
92+
}
93+
7394
pub fn id(&self) -> &u64 {
7495
&self.id
7596
}
@@ -82,8 +103,12 @@ impl Args {
82103
&self.quite
83104
}
84105

85-
pub fn sleep(&self) -> &u32 {
86-
&self.sleep
106+
pub fn frequency(&self) -> &u32 {
107+
&self.frequency
108+
}
109+
110+
pub fn max_packets(&self) -> &Option<u64> {
111+
&self.max_packets
87112
}
88113
}
89114

@@ -95,22 +120,137 @@ async fn main() {
95120
let config_file = args.config();
96121
let msg_size = *args.msg_size();
97122
let id = *args.id();
98-
let sleep = *args.sleep();
123+
let frequency = *args.frequency();
124+
let mut streaming = *args.streaming();
125+
let max_packets = args.max_packets;
126+
if input.is_some() {
127+
streaming = false;
128+
}
99129

100-
// setup agent config
130+
info!(
131+
"configuration -- workload file: {}, agent config {}, publisher id: {}, streaming mode: {}, msg size: {}",
132+
input.as_ref().unwrap_or(&"None".to_string()),
133+
config_file,
134+
id,
135+
streaming,
136+
msg_size,
137+
);
138+
139+
// start local agent
140+
// get service
101141
let mut config = config::load_config(config_file).expect("failed to load configuration");
102142
let _guard = config.tracing.setup_tracing_subscriber();
143+
let svc_id = agp_config::component::id::ID::new_with_str("gateway/0").unwrap();
144+
let svc = config.services.get_mut(&svc_id).unwrap();
103145

104-
info!(
105-
"configuration -- input file: {}, agent config: {}, msg size: {}",
106-
input, config_file, msg_size
107-
);
146+
// create local agent
147+
let agent_name = Agent::from_strings("cisco", "default", "publisher", id);
148+
// required in streaming mode
149+
let dest_name = AgentType::from_strings("cisco", "default", "subscriber");
150+
let mut rx = svc
151+
.create_agent(&agent_name)
152+
.expect("failed to create agent");
153+
154+
// connect to the remote gateway
155+
let conn_id = svc.connect(None).await.unwrap();
156+
info!("remote connection id = {}", conn_id);
157+
158+
// subscribe for local name
159+
match svc
160+
.subscribe(
161+
&agent_name,
162+
agent_name.agent_type(),
163+
agent_name.agent_id_option(),
164+
Some(conn_id),
165+
)
166+
.await
167+
{
168+
Ok(_) => {}
169+
Err(e) => {
170+
panic!("an error accoured while adding a subscription {}", e);
171+
}
172+
}
173+
174+
svc.set_route(&agent_name, &dest_name, None, conn_id)
175+
.await
176+
.unwrap();
177+
178+
// wait for the connection to be established
179+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
108180

181+
// STREAMING MODE
182+
if streaming {
183+
// create streaming session
184+
let res = svc
185+
.create_session(
186+
&agent_name,
187+
agp_service::session::SessionConfig::Streaming(StreamingConfiguration {
188+
source: agent_name.clone(),
189+
max_retries: None,
190+
timeout: None,
191+
}),
192+
)
193+
.await;
194+
if res.is_err() {
195+
panic!("error creating fire and forget session");
196+
}
197+
198+
// loop to listen to errors coming from the local gateway
199+
tokio::spawn(async move {
200+
loop {
201+
match rx.recv().await {
202+
None => {
203+
info!(%conn_id, "end of stream");
204+
break;
205+
}
206+
Some(msg_info) => {
207+
if msg_info.is_err() {
208+
error!("received an error message {:?}", msg_info);
209+
continue;
210+
} else {
211+
panic!("received a message from the gateway, this should never happen");
212+
}
213+
}
214+
}
215+
}
216+
});
217+
218+
// get the session
219+
let session_info = res.unwrap();
220+
221+
for i in 0..max_packets.unwrap_or(u64::MAX) {
222+
let payload: Vec<u8> = vec![120; msg_size as usize]; // ASCII for 'x' = 120
223+
info!("publishing message {}", i);
224+
// set fanout > 1 to send the message in broadcast
225+
let flags = AgpHeaderFlags::new(10, None, None, None, None);
226+
if svc
227+
.publish_with_flags(
228+
&agent_name,
229+
session_info.clone(),
230+
&dest_name,
231+
None,
232+
flags,
233+
payload,
234+
)
235+
.await
236+
.is_err()
237+
{
238+
error!("an error occurred sending publication, the test will fail",);
239+
}
240+
if frequency != 0 {
241+
tokio::time::sleep(tokio::time::Duration::from_millis(frequency as u64)).await;
242+
}
243+
}
244+
return;
245+
}
246+
247+
// WORKLOAD MODE
248+
// setup agent config
109249
let mut publication_list = HashMap::new();
110250
let mut oracle = HashMap::new();
111251
let mut routes = Vec::new();
112252

113-
let res = File::open(input);
253+
let res = File::open(input.as_ref().unwrap());
114254
if res.is_err() {
115255
panic!("error opening the input file");
116256
}
@@ -142,37 +282,6 @@ async fn main() {
142282
}
143283
}
144284

145-
// start local agent
146-
// get service
147-
let svc_id = agp_config::component::id::ID::new_with_str("gateway/0").unwrap();
148-
let svc = config.services.get_mut(&svc_id).unwrap();
149-
150-
// create local agent
151-
let agent_name = Agent::from_strings("cisco", "default", "publisher", id);
152-
let mut rx = svc
153-
.create_agent(&agent_name)
154-
.expect("failed to create agent");
155-
156-
// connect to the remote gateway
157-
let conn_id = svc.connect(None).await.unwrap();
158-
info!("remote connection id = {}", conn_id);
159-
160-
// subscribe for local name
161-
match svc
162-
.subscribe(
163-
&agent_name,
164-
agent_name.agent_type(),
165-
agent_name.agent_id_option(),
166-
Some(conn_id),
167-
)
168-
.await
169-
{
170-
Ok(_) => {}
171-
Err(e) => {
172-
panic!("an error accoured while adding a subscription {}", e);
173-
}
174-
}
175-
176285
// set routes for all subscriptions
177286
for r in routes {
178287
match svc
@@ -317,8 +426,8 @@ async fn main() {
317426
bar.inc(1);
318427
}
319428

320-
if sleep != 0 {
321-
tokio::time::sleep(tokio::time::Duration::from_millis(sleep as u64)).await;
429+
if frequency != 0 {
430+
tokio::time::sleep(tokio::time::Duration::from_millis(frequency as u64)).await;
322431
}
323432
}
324433
let duration = start.elapsed();

0 commit comments

Comments
 (0)