Skip to content

chore(statsd sink): refactor statsd sink to stream-based style #16199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
db9f4ca
wip commit
tobz Jan 19, 2023
e5280d9
wip
tobz Jan 20, 2023
a22763e
massive rework
tobz Jan 30, 2023
0ab950b
Merge branch 'master' into tobz/rewrite-statsd-sink
tobz Jan 30, 2023
bacf1ff
add example metadata
tobz Jan 30, 2023
a17e839
port doc examples + keepalive for TCP
tobz Jan 30, 2023
8609e9f
Merge branch 'master' into tobz/rewrite-statsd-sink
tobz Feb 16, 2023
a08bc84
start refactoring based on PR feedback
tobz Feb 16, 2023
b32e933
Merge branch 'master' into tobz/rewrite-statsd-sink
tobz Feb 17, 2023
6037898
lot o cleanup
tobz Feb 17, 2023
f28379f
Merge branch 'master' into tobz/rewrite-statsd-sink
tobz Feb 22, 2023
cd9fe58
start futzing with an opaque wrapper for tcp/udp/udx
tobz Feb 22, 2023
8c1dd18
more
tobz Feb 23, 2023
36f5627
Merge branch 'master' into tobz/rewrite-statsd-sink
tobz Apr 11, 2023
577efa9
a bunch of rework to simplify the newer stuff
tobz Apr 11, 2023
f67a09e
revert unrelated changes to DD metrics sink
tobz Apr 11, 2023
f0f531b
fix tests + make encoding more efficient
tobz Apr 13, 2023
667ce41
clippy lint + trying to sort of socket2 stuff for cross-platform
tobz Apr 14, 2023
2144849
upgrade to tokio 1.27.0 + cleanup socket asfd/asrawfd stuff
tobz Apr 17, 2023
78e633b
Merge branch 'master' into tobz/rewrite-statsd-sink
tobz Apr 17, 2023
fe9a495
always include the unix related internal events
tobz Apr 17, 2023
392c4d7
fix booboos
tobz Apr 17, 2023
fce04a2
make generated docs line up
tobz Apr 17, 2023
332dff2
sighhhh
tobz Apr 17, 2023
7664aa7
bring in TLS support for TCP service
tobz Apr 18, 2023
35cde29
Merge branch 'master' into tobz/rewrite-statsd-sink
tobz Apr 18, 2023
292b171
set a default unix mode to avoid breaking change with addition of uni…
tobz Apr 19, 2023
1373f1e
Merge branch 'master' into tobz/rewrite-statsd-sink
tobz May 24, 2023
74d9bff
consolidate hostandport desc + remove unused UdpService
tobz May 24, 2023
f6d7292
some PR cleanup
tobz May 24, 2023
be7c5ef
fix fmt
tobz May 24, 2023
d35bcf4
fix handling of encoding request splits + add more accurate batch siz…
tobz May 25, 2023
43b3c3e
add missing file
tobz May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ impl From<&'static str> for Protocol {
}
}

impl From<Protocol> for SharedString {
fn from(value: Protocol) -> Self {
value.0
}
}

/// Macro to take care of some of the repetitive boilerplate in implementing a registered event. See
/// the other events in this module for examples of how to use this.
///
Expand Down
13 changes: 1 addition & 12 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,18 +253,7 @@ pub(crate) use self::statsd_sink::*;
pub(crate) use self::tag_cardinality_limit::*;
#[cfg(feature = "transforms-throttle")]
pub(crate) use self::throttle::*;
#[cfg(all(
any(
feature = "sinks-socket",
feature = "sinks-statsd",
feature = "sources-dnstap",
feature = "sources-metrics",
feature = "sources-statsd",
feature = "sources-syslog",
feature = "sources-socket"
),
unix
))]
#[cfg(unix)]
pub(crate) use self::unix::*;
#[cfg(feature = "sinks-websocket")]
pub(crate) use self::websocket::*;
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub enum SocketMode {
}

impl SocketMode {
const fn as_str(self) -> &'static str {
pub const fn as_str(self) -> &'static str {
match self {
Self::Tcp => "tcp",
Self::Udp => "udp",
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/statsd_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vector_common::internal_event::{
#[derive(Debug)]
pub struct StatsdInvalidMetricError<'a> {
pub value: &'a MetricValue,
pub kind: &'a MetricKind,
pub kind: MetricKind,
}

impl<'a> InternalEvent for StatsdInvalidMetricError<'a> {
Expand Down
28 changes: 28 additions & 0 deletions src/internal_events/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,34 @@ impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
}
}

#[derive(Debug)]
pub struct UnixSendIncompleteError {
pub data_size: usize,
pub sent: usize,
}

impl InternalEvent for UnixSendIncompleteError {
fn emit(self) {
let reason = "Could not send all data in one Unix datagram.";
error!(
message = reason,
data_size = self.data_size,
sent = self.sent,
dropped = self.data_size - self.sent,
error_type = error_type::WRITER_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total", 1,
"error_type" => error_type::WRITER_FAILED,
"stage" => error_stage::SENDING,
);

emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
}
}

#[derive(Debug)]
pub struct UnixSocketFileDeleteError<'a> {
pub path: &'a Path,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub mod line_agg;
pub mod list;
#[cfg(any(feature = "sources-nats", feature = "sinks-nats"))]
pub(crate) mod nats;
pub mod net;
#[allow(unreachable_pub)]
pub(crate) mod proto;
pub mod providers;
Expand Down Expand Up @@ -112,7 +113,6 @@ pub mod trace;
#[allow(unreachable_pub)]
pub mod transforms;
pub mod types;
pub mod udp;
pub mod unit_test;
pub(crate) mod utilization;
pub mod validate;
Expand Down
52 changes: 52 additions & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! Networking-related helper functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a curiosity not an issue - I noticed this is directly in src/ and that we have some other files in there that could conceivably be placed in something like a "utils" folder or something similar. Just wondering if that has ever been brought up. On second look we have a src/common that might have a similar goal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this could/should actually live in vector-core.

Ironically, vector-core already has a nearly identical file in there doing almost the exact same thing, but when I went to convert it all over, it felt like a lot of toil/change on top of an already large PR, and I felt like it would be an easy lay-up PR to follow up with.

use std::{io, time::Duration};

use socket2::{SockRef, TcpKeepalive};
use tokio::net::TcpStream;

/// Sets the receive buffer size for a socket.
///
/// This is the equivalent of setting the `SO_RCVBUF` socket setting directly.
///
/// # Errors
///
/// If there is an error setting the receive buffer size on the given socket, or if the value given
/// as the socket is not a valid socket, an error variant will be returned explaining the underlying
/// I/O error.
pub fn set_receive_buffer_size<'s, S>(socket: &'s S, size: usize) -> io::Result<()>
where
SockRef<'s>: From<&'s S>,
{
SockRef::from(socket).set_recv_buffer_size(size)
}

/// Sets the send buffer size for a socket.
///
/// This is the equivalent of setting the `SO_SNDBUF` socket setting directly.
///
/// # Errors
///
/// If there is an error setting the send buffer size on the given socket, or if the value given
/// as the socket is not a valid socket, an error variant will be returned explaining the underlying
/// I/O error.
pub fn set_send_buffer_size<'s, S>(socket: &'s S, size: usize) -> io::Result<()>
where
SockRef<'s>: From<&'s S>,
{
SockRef::from(socket).set_send_buffer_size(size)
}

/// Sets the TCP keepalive behavior on a socket.
///
/// This is the equivalent of setting the `SO_KEEPALIVE` and `TCP_KEEPALIVE` socket settings
/// directly.
///
/// # Errors
///
/// If there is an error with either enabling keepalive probes or setting the TCP keepalive idle
/// timeout on the given socket, an error variant will be returned explaining the underlying I/O
/// error.
pub fn set_keepalive(socket: &TcpStream, ttl: Duration) -> io::Result<()> {
SockRef::from(socket).set_tcp_keepalive(&TcpKeepalive::new().with_time(ttl))
}
Loading