Skip to content

Commit 2a76cac

Browse files
authored
chore(statsd sink): refactor statsd sink to stream-based style (vectordotdev#16199)
This PR completely refactors the `statsd` sink in the "new style", which uses stream-based combinators to build a `Stream` implementation that drives component behavior. At a high-level, the PR is indeed for refactoring the sink, but ultimately includes as much, if not more, refactoring work around establishing reusable `Service`-based primitives for building other sinks like `statsd` i.e. `socket` or `syslog`. ## Reviewer Notes I've mostly copied the existing shared socket sink types -- `TcpSinkConfig`, etc -- and existing socket services -- `UdpService` -- and created consistent versions of them for TCP, UDP, and Unix Domain sockets. This includes a configuration type that is `Configurable`-compatible for all of them, with socket-specific configurations[1] and then methods for generating both the `Service` implementation and a `Healthcheck` implementation. Ultimately, this should form the basis of other sink refactors that use sockets directly (`socket`, `syslog`, etc) but it may be desirable to do some more trait-ifying to avoid some of the necessary boilerplate introduced here. ## Remaining Work - [x] fix normalizer unit tests + add one for sketches
1 parent b28d915 commit 2a76cac

File tree

31 files changed

+2346
-786
lines changed

31 files changed

+2346
-786
lines changed

.github/actions/spelling/allow.txt

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Enot
5252
Evercoss
5353
Explay
5454
FAQs
55+
FQDNs
5556
Fabro
5657
Figma
5758
Flipboard

lib/vector-common/src/internal_event/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ impl From<&'static str> for Protocol {
131131
}
132132
}
133133

134+
impl From<Protocol> for SharedString {
135+
fn from(value: Protocol) -> Self {
136+
value.0
137+
}
138+
}
139+
134140
/// Macro to take care of some of the repetitive boilerplate in implementing a registered event. See
135141
/// the other events in this module for examples of how to use this.
136142
///

src/internal_events/mod.rs

+1-12
Original file line numberDiff line numberDiff line change
@@ -253,18 +253,7 @@ pub(crate) use self::statsd_sink::*;
253253
pub(crate) use self::tag_cardinality_limit::*;
254254
#[cfg(feature = "transforms-throttle")]
255255
pub(crate) use self::throttle::*;
256-
#[cfg(all(
257-
any(
258-
feature = "sinks-socket",
259-
feature = "sinks-statsd",
260-
feature = "sources-dnstap",
261-
feature = "sources-metrics",
262-
feature = "sources-statsd",
263-
feature = "sources-syslog",
264-
feature = "sources-socket"
265-
),
266-
unix
267-
))]
256+
#[cfg(unix)]
268257
pub(crate) use self::unix::*;
269258
#[cfg(feature = "sinks-websocket")]
270259
pub(crate) use self::websocket::*;

src/internal_events/socket.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub enum SocketMode {
1313
}
1414

1515
impl SocketMode {
16-
const fn as_str(self) -> &'static str {
16+
pub const fn as_str(self) -> &'static str {
1717
match self {
1818
Self::Tcp => "tcp",
1919
Self::Udp => "udp",

src/internal_events/statsd_sink.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use vector_common::internal_event::{
1010
#[derive(Debug)]
1111
pub struct StatsdInvalidMetricError<'a> {
1212
pub value: &'a MetricValue,
13-
pub kind: &'a MetricKind,
13+
pub kind: MetricKind,
1414
}
1515

1616
impl<'a> InternalEvent for StatsdInvalidMetricError<'a> {

src/internal_events/unix.rs

+28
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,34 @@ impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
9090
}
9191
}
9292

93+
#[derive(Debug)]
94+
pub struct UnixSendIncompleteError {
95+
pub data_size: usize,
96+
pub sent: usize,
97+
}
98+
99+
impl InternalEvent for UnixSendIncompleteError {
100+
fn emit(self) {
101+
let reason = "Could not send all data in one Unix datagram.";
102+
error!(
103+
message = reason,
104+
data_size = self.data_size,
105+
sent = self.sent,
106+
dropped = self.data_size - self.sent,
107+
error_type = error_type::WRITER_FAILED,
108+
stage = error_stage::SENDING,
109+
internal_log_rate_limit = true,
110+
);
111+
counter!(
112+
"component_errors_total", 1,
113+
"error_type" => error_type::WRITER_FAILED,
114+
"stage" => error_stage::SENDING,
115+
);
116+
117+
emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
118+
}
119+
}
120+
93121
#[derive(Debug)]
94122
pub struct UnixSocketFileDeleteError<'a> {
95123
pub path: &'a Path,

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub mod line_agg;
8383
pub mod list;
8484
#[cfg(any(feature = "sources-nats", feature = "sinks-nats"))]
8585
pub(crate) mod nats;
86+
pub mod net;
8687
#[allow(unreachable_pub)]
8788
pub(crate) mod proto;
8889
pub mod providers;
@@ -112,7 +113,6 @@ pub mod trace;
112113
#[allow(unreachable_pub)]
113114
pub mod transforms;
114115
pub mod types;
115-
pub mod udp;
116116
pub mod unit_test;
117117
pub(crate) mod utilization;
118118
pub mod validate;

src/net.rs

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
//! Networking-related helper functions.
2+
3+
use std::{io, time::Duration};
4+
5+
use socket2::{SockRef, TcpKeepalive};
6+
use tokio::net::TcpStream;
7+
8+
/// Sets the receive buffer size for a socket.
9+
///
10+
/// This is the equivalent of setting the `SO_RCVBUF` socket setting directly.
11+
///
12+
/// # Errors
13+
///
14+
/// If there is an error setting the receive buffer size on the given socket, or if the value given
15+
/// as the socket is not a valid socket, an error variant will be returned explaining the underlying
16+
/// I/O error.
17+
pub fn set_receive_buffer_size<'s, S>(socket: &'s S, size: usize) -> io::Result<()>
18+
where
19+
SockRef<'s>: From<&'s S>,
20+
{
21+
SockRef::from(socket).set_recv_buffer_size(size)
22+
}
23+
24+
/// Sets the send buffer size for a socket.
25+
///
26+
/// This is the equivalent of setting the `SO_SNDBUF` socket setting directly.
27+
///
28+
/// # Errors
29+
///
30+
/// If there is an error setting the send buffer size on the given socket, or if the value given
31+
/// as the socket is not a valid socket, an error variant will be returned explaining the underlying
32+
/// I/O error.
33+
pub fn set_send_buffer_size<'s, S>(socket: &'s S, size: usize) -> io::Result<()>
34+
where
35+
SockRef<'s>: From<&'s S>,
36+
{
37+
SockRef::from(socket).set_send_buffer_size(size)
38+
}
39+
40+
/// Sets the TCP keepalive behavior on a socket.
41+
///
42+
/// This is the equivalent of setting the `SO_KEEPALIVE` and `TCP_KEEPALIVE` socket settings
43+
/// directly.
44+
///
45+
/// # Errors
46+
///
47+
/// If there is an error with either enabling keepalive probes or setting the TCP keepalive idle
48+
/// timeout on the given socket, an error variant will be returned explaining the underlying I/O
49+
/// error.
50+
pub fn set_keepalive(socket: &TcpStream, ttl: Duration) -> io::Result<()> {
51+
SockRef::from(socket).set_tcp_keepalive(&TcpKeepalive::new().with_time(ttl))
52+
}

0 commit comments

Comments
 (0)