-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtcp_client.rs
168 lines (144 loc) · 5.69 KB
/
tcp_client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
use std::io::{Read, Write};
use std::net::TcpStream;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use mavio::dialects::minimal as dialect;
use mavio::io::{StdIoReader, StdIoWriter};
use mavio::protocol::{Dialect, V2};
use mavio::{Frame, Receiver, Sender};
use dialect::enums::{MavAutopilot, MavModeFlag, MavState, MavType};
use dialect::Minimal;
/// TCP address for server and clients.
const ADDRESS: &str = ":::56001";
/// Interval between sending messages.
const SEND_INTERVAL: Duration = Duration::from_millis(500);
/// Number of messages sent before stopping.
const N_ITER: usize = 10;
/// Number of clients.
const N_CLIENTS: usize = 5;
/// Listen to `n_iter` incoming frames and decode `HEARTBEAT` message.
fn listen<R: Read>(reader: R, whoami: String, n_iter: usize) -> mavio::error::Result<()> {
let mut receiver = Receiver::versionless(StdIoReader::new(reader));
for _ in 0..n_iter {
// Decode the entire frame
let frame = receiver.recv()?;
// Validate frame in the context of dialect specification (including checksum)
if let Err(err) = frame.validate_checksum::<Minimal>() {
log::warn!("[{whoami}] INVALID FRAME #{}: {err:?}", frame.sequence());
continue;
}
log::info!(
"[{whoami}] FRAME #{}: mavlink_version={:?} system_id={}, component_id={}",
frame.sequence(),
frame.version(),
frame.system_id(),
frame.component_id(),
);
// Decode message
match frame.decode() {
Ok(msg) => {
if let Minimal::Heartbeat(msg) = msg {
log::info!("[{whoami}] HEARTBEAT #{}: {msg:?}", frame.sequence());
} else {
// Some other message
log::info!("[{whoami}] MESSAGE #{}: {msg:?}", frame.sequence());
}
}
Err(err) => {
log::warn!("[{whoami}] DECODE ERROR #{}: {err:?}", frame.sequence());
}
}
}
Ok(())
}
/// Send `n_iter` heartbeat messages, then stops.
fn send_heartbeats<W: Write>(writer: W, whoami: String, n_iter: usize) -> mavio::error::Result<()> {
// Use a versionless sender that accepts both `MAVLink 1` and `MAVLink 2` frames.
let mut sender = Sender::versionless(StdIoWriter::new(writer));
// MAVLink connection settings
let mavlink_version = V2;
let system_id = 15;
let component_id = 42;
let mut sequence: u8 = 0;
for _ in 0..n_iter {
// Define message
let message = dialect::messages::Heartbeat {
type_: MavType::FixedWing,
autopilot: MavAutopilot::Generic,
base_mode: MavModeFlag::TEST_ENABLED & MavModeFlag::CUSTOM_MODE_ENABLED,
custom_mode: 0,
system_status: MavState::Active,
mavlink_version: Minimal::version().unwrap_or(0),
};
// Manually build frame from message (without `Endpoint`)
let frame = Frame::builder()
.sequence(sequence)
.system_id(system_id)
.component_id(component_id)
.version(mavlink_version)
.message(&message)?
.build()
.into_versionless();
if let Err(err) = sender.send(&frame) {
log::warn!("[{whoami}] SEND ERROR #{}: {err:?}", frame.sequence());
continue;
}
log::info!("[{whoami}] FRAME #{} SENT", sequence);
sequence = sequence.wrapping_add(1); // Increase sequence number
thread::sleep(SEND_INTERVAL);
}
Ok(())
}
/// Connect to server, listen to incoming messages, send `n_iter` heartbeats.
fn client(address: String, whoami: String, n_iter: usize) -> mavio::error::Result<String> {
let client = TcpStream::connect(address)?;
handle_stream(client, whoami.clone(), n_iter)?;
Ok(whoami)
}
/// Takes stream, sends `n` heartbeat messages, listens for incoming messages.
fn handle_stream(stream: TcpStream, whoami: String, n_iter: usize) -> mavio::error::Result<()> {
let reader = stream.try_clone()?;
let recv_name = format!("{} receiver", &whoami);
let send_name = format!("{} sender", &whoami);
// Spawn a thread that will listen to incoming messages
thread::spawn(move || -> mavio::error::Result<()> { listen(reader, recv_name, n_iter) });
// Send heartbeats
send_heartbeats(stream, send_name, n_iter)
}
/// Spawns `n_client` that will connect to server and send `n_iter` heartbeats.
fn spawn_clients(address: String, n_clients: usize, n_iter: usize) {
// Spawn clients
let (tx, rx) = mpsc::channel();
for i in 0..n_clients {
let channel = tx.clone();
let address = address.clone();
thread::spawn(move || {
channel
.send(client(address, format!("client #{i}"), n_iter))
.unwrap();
});
}
// Await clients to complete their jobs, then exit
for _ in 0..n_clients {
match rx.recv().unwrap() {
Ok(whoami) => {
log::info!("FINISHED: {whoami}.");
}
Err(err) => {
log::error!("Client ERROR: {err:?}");
}
}
}
}
/// Spawns [`N_CLIENTS`] clients. Each listens to incoming messages, sends [`N_ITER`] of heartbeat messages, then stops.
///
/// Requires server to work properly. Use `tcp_server` from library examples.
fn main() {
// Setup logger
env_logger::builder()
.filter_level(log::LevelFilter::Info) // Suppress everything below `info` for third-party modules.
.filter_module(env!("CARGO_PKG_NAME"), log::LevelFilter::Trace) // Allow everything from current package
.init();
spawn_clients(ADDRESS.to_string(), N_CLIENTS, N_ITER);
}