Skip to content

Commit 908f07c

Browse files
committed
enhancement(socket sink): support unix datagram mode
1 parent 6d1d521 commit 908f07c

File tree

4 files changed

+299
-43
lines changed

4 files changed

+299
-43
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
The `socket` sink now supports the `unix_mode` configuration option that specifies the Unix socket mode to use. Valid values:
2+
- `Stream` (default) - Stream-oriented (`SOCK_STREAM`)
3+
- `Datagram` - Datagram-oriented (`SOCK_DGRAM`)
4+
5+
This option only applies when `mode = "unix"`.
6+
7+
authors: jpovixwm

src/sinks/socket.rs

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,12 @@ impl SinkConfig for SocketSinkConfig {
159159

160160
#[cfg(test)]
161161
mod test {
162-
#[cfg(unix)]
163-
use std::path::PathBuf;
164162
use std::{
165163
future::ready,
166164
net::{SocketAddr, UdpSocket},
167165
};
166+
#[cfg(unix)]
167+
use std::{os::unix::net::UnixDatagram, path::PathBuf};
168168

169169
use futures::stream::StreamExt;
170170
use futures_util::stream;
@@ -196,14 +196,42 @@ mod test {
196196
crate::test_util::test_generate_config::<SocketSinkConfig>();
197197
}
198198

199-
async fn test_udp(addr: SocketAddr) {
200-
let receiver = UdpSocket::bind(addr).unwrap();
199+
enum DatagramSocket {
200+
Udp(UdpSocket),
201+
#[cfg(unix)]
202+
Unix(UnixDatagram),
203+
}
204+
205+
enum DatagramSocketAddr {
206+
Udp(SocketAddr),
207+
#[cfg(unix)]
208+
Unix(PathBuf),
209+
}
210+
211+
async fn test_datagram(datagram_addr: DatagramSocketAddr) {
212+
let receiver = match &datagram_addr {
213+
DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
214+
#[cfg(unix)]
215+
DatagramSocketAddr::Unix(path) => {
216+
DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
217+
}
218+
};
201219

202220
let config = SocketSinkConfig {
203-
mode: Mode::Udp(UdpMode {
204-
config: UdpSinkConfig::from_address(addr.to_string()),
205-
encoding: JsonSerializerConfig::default().into(),
206-
}),
221+
mode: match &datagram_addr {
222+
DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
223+
config: UdpSinkConfig::from_address(addr.to_string()),
224+
encoding: JsonSerializerConfig::default().into(),
225+
}),
226+
#[cfg(unix)]
227+
DatagramSocketAddr::Unix(path) => Mode::Unix(UnixMode {
228+
config: UnixSinkConfig::new(
229+
path.to_path_buf(),
230+
crate::sinks::util::service::net::UnixMode::Datagram,
231+
),
232+
encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
233+
}),
234+
},
207235
acknowledgements: Default::default(),
208236
};
209237

@@ -218,9 +246,13 @@ mod test {
218246
.expect("Running sink failed");
219247

220248
let mut buf = [0; 256];
221-
let (size, _src_addr) = receiver
222-
.recv_from(&mut buf)
223-
.expect("Did not receive message");
249+
let size = match &receiver {
250+
DatagramSocket::Udp(sock) => {
251+
sock.recv_from(&mut buf).expect("Did not receive message").0
252+
}
253+
#[cfg(unix)]
254+
DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
255+
};
224256

225257
let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
226258
let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
@@ -234,14 +266,25 @@ mod test {
234266
async fn udp_ipv4() {
235267
trace_init();
236268

237-
test_udp(next_addr()).await;
269+
test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
238270
}
239271

240272
#[tokio::test]
241273
async fn udp_ipv6() {
242274
trace_init();
243275

244-
test_udp(next_addr_v6()).await;
276+
test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
277+
}
278+
279+
#[cfg(unix)]
280+
#[tokio::test]
281+
async fn unix_datagram() {
282+
trace_init();
283+
284+
test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
285+
"unix_datagram_socket_test",
286+
)))
287+
.await;
245288
}
246289

247290
#[tokio::test]
@@ -292,7 +335,10 @@ mod test {
292335

293336
let config = SocketSinkConfig {
294337
mode: Mode::Unix(UnixMode {
295-
config: UnixSinkConfig::new(out_path),
338+
config: UnixSinkConfig::new(
339+
out_path,
340+
crate::sinks::util::service::net::UnixMode::Stream,
341+
),
296342
encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
297343
}),
298344
acknowledgements: Default::default(),

0 commit comments

Comments
 (0)