diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 7b0334fa0e905..c3a73ef09667c 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -52,6 +52,7 @@ Enot Evercoss Explay FAQs +FQDNs Fabro Figma Flipboard diff --git a/lib/vector-common/src/internal_event/mod.rs b/lib/vector-common/src/internal_event/mod.rs index dcf0c42114fe7..37d0dcfc634d9 100644 --- a/lib/vector-common/src/internal_event/mod.rs +++ b/lib/vector-common/src/internal_event/mod.rs @@ -131,6 +131,12 @@ impl From<&'static str> for Protocol { } } +impl From for SharedString { + fn from(value: Protocol) -> Self { + value.0 + } +} + /// Macro to take care of some of the repetitive boilerplate in implementing a registered event. See /// the other events in this module for examples of how to use this. /// diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index e8ce5bad9e4c9..f357cbb0f469b 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -253,18 +253,7 @@ pub(crate) use self::statsd_sink::*; pub(crate) use self::tag_cardinality_limit::*; #[cfg(feature = "transforms-throttle")] pub(crate) use self::throttle::*; -#[cfg(all( - any( - feature = "sinks-socket", - feature = "sinks-statsd", - feature = "sources-dnstap", - feature = "sources-metrics", - feature = "sources-statsd", - feature = "sources-syslog", - feature = "sources-socket" - ), - unix -))] +#[cfg(unix)] pub(crate) use self::unix::*; #[cfg(feature = "sinks-websocket")] pub(crate) use self::websocket::*; diff --git a/src/internal_events/socket.rs b/src/internal_events/socket.rs index 468701a1fa194..28ac92270f3d6 100644 --- a/src/internal_events/socket.rs +++ b/src/internal_events/socket.rs @@ -13,7 +13,7 @@ pub enum SocketMode { } impl SocketMode { - const fn as_str(self) -> &'static str { + pub const fn as_str(self) -> &'static str { match self { Self::Tcp => "tcp", Self::Udp => "udp", diff --git a/src/internal_events/statsd_sink.rs b/src/internal_events/statsd_sink.rs index 248cddf8cfd5f..27e33fd0aa89b 100644 --- a/src/internal_events/statsd_sink.rs +++ b/src/internal_events/statsd_sink.rs @@ -10,7 +10,7 @@ use vector_common::internal_event::{ #[derive(Debug)] pub struct StatsdInvalidMetricError<'a> { pub value: &'a MetricValue, - pub kind: &'a MetricKind, + pub kind: MetricKind, } impl<'a> InternalEvent for StatsdInvalidMetricError<'a> { diff --git a/src/internal_events/unix.rs b/src/internal_events/unix.rs index a74c2b7e2c12b..2f004ec38b826 100644 --- a/src/internal_events/unix.rs +++ b/src/internal_events/unix.rs @@ -90,6 +90,34 @@ impl InternalEvent for UnixSocketSendError<'_, E> { } } +#[derive(Debug)] +pub struct UnixSendIncompleteError { + pub data_size: usize, + pub sent: usize, +} + +impl InternalEvent for UnixSendIncompleteError { + fn emit(self) { + let reason = "Could not send all data in one Unix datagram."; + error!( + message = reason, + data_size = self.data_size, + sent = self.sent, + dropped = self.data_size - self.sent, + error_type = error_type::WRITER_FAILED, + stage = error_stage::SENDING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::WRITER_FAILED, + "stage" => error_stage::SENDING, + ); + + emit!(ComponentEventsDropped:: { count: 1, reason }); + } +} + #[derive(Debug)] pub struct UnixSocketFileDeleteError<'a> { pub path: &'a Path, diff --git a/src/lib.rs b/src/lib.rs index 910c244dd4ade..37d71f87922ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ pub mod line_agg; pub mod list; #[cfg(any(feature = "sources-nats", feature = "sinks-nats"))] pub(crate) mod nats; +pub mod net; #[allow(unreachable_pub)] pub(crate) mod proto; pub mod providers; @@ -112,7 +113,6 @@ pub mod trace; #[allow(unreachable_pub)] pub mod transforms; pub mod types; -pub mod udp; pub mod unit_test; pub(crate) mod utilization; pub mod validate; diff --git a/src/net.rs b/src/net.rs new file mode 100644 index 0000000000000..f62abfa32974a --- /dev/null +++ b/src/net.rs @@ -0,0 +1,52 @@ +//! Networking-related helper functions. + +use std::{io, time::Duration}; + +use socket2::{SockRef, TcpKeepalive}; +use tokio::net::TcpStream; + +/// Sets the receive buffer size for a socket. +/// +/// This is the equivalent of setting the `SO_RCVBUF` socket setting directly. +/// +/// # Errors +/// +/// If there is an error setting the receive buffer size on the given socket, or if the value given +/// as the socket is not a valid socket, an error variant will be returned explaining the underlying +/// I/O error. +pub fn set_receive_buffer_size<'s, S>(socket: &'s S, size: usize) -> io::Result<()> +where + SockRef<'s>: From<&'s S>, +{ + SockRef::from(socket).set_recv_buffer_size(size) +} + +/// Sets the send buffer size for a socket. +/// +/// This is the equivalent of setting the `SO_SNDBUF` socket setting directly. +/// +/// # Errors +/// +/// If there is an error setting the send buffer size on the given socket, or if the value given +/// as the socket is not a valid socket, an error variant will be returned explaining the underlying +/// I/O error. +pub fn set_send_buffer_size<'s, S>(socket: &'s S, size: usize) -> io::Result<()> +where + SockRef<'s>: From<&'s S>, +{ + SockRef::from(socket).set_send_buffer_size(size) +} + +/// Sets the TCP keepalive behavior on a socket. +/// +/// This is the equivalent of setting the `SO_KEEPALIVE` and `TCP_KEEPALIVE` socket settings +/// directly. +/// +/// # Errors +/// +/// If there is an error with either enabling keepalive probes or setting the TCP keepalive idle +/// timeout on the given socket, an error variant will be returned explaining the underlying I/O +/// error. +pub fn set_keepalive(socket: &TcpStream, ttl: Duration) -> io::Result<()> { + SockRef::from(socket).set_tcp_keepalive(&TcpKeepalive::new().with_time(ttl)) +} diff --git a/src/sinks/statsd.rs b/src/sinks/statsd.rs deleted file mode 100644 index 95978bd54ec88..0000000000000 --- a/src/sinks/statsd.rs +++ /dev/null @@ -1,633 +0,0 @@ -use std::{ - fmt::Display, - net::{IpAddr, Ipv4Addr, SocketAddr}, - task::{Context, Poll}, -}; - -use bytes::{BufMut, BytesMut}; -use futures::{future, stream, SinkExt, TryFutureExt}; -use futures_util::FutureExt; -use tokio_util::codec::Encoder; -use tower::{Service, ServiceBuilder}; - -use vector_config::configurable_component; -use vector_core::ByteSizeOf; - -#[cfg(unix)] -use crate::sinks::util::unix::UnixSinkConfig; -use crate::{ - config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, - event::{ - metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind}, - Event, - }, - internal_events::StatsdInvalidMetricError, - sinks::util::{ - buffer::metrics::compress_distribution, - encode_namespace, - tcp::TcpSinkConfig, - udp::{UdpService, UdpSinkConfig}, - BatchConfig, BatchSink, Buffer, Compression, EncodedEvent, - }, -}; - -use super::util::SinkBatchSettings; - -pub struct StatsdSvc { - inner: UdpService, -} - -/// Configuration for the `statsd` sink. -#[configurable_component(sink("statsd"))] -#[derive(Clone, Debug)] -pub struct StatsdSinkConfig { - /// Sets the default namespace for any metrics sent. - /// - /// This namespace is only used if a metric has no existing namespace. When a namespace is - /// present, it is used as a prefix to the metric name, and separated with a period (`.`). - #[serde(alias = "namespace")] - #[configurable(metadata(docs::examples = "service"))] - pub default_namespace: Option, - - #[serde(flatten)] - pub mode: Mode, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - pub acknowledgements: AcknowledgementsConfig, -} - -/// Socket mode. -#[configurable_component] -#[derive(Clone, Debug)] -#[serde(tag = "mode", rename_all = "snake_case")] -#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))] -pub enum Mode { - /// Send over TCP. - Tcp(TcpSinkConfig), - - /// Send over UDP. - Udp(StatsdUdpConfig), - - /// Send over a Unix domain socket (UDS). - #[cfg(unix)] - Unix(UnixSinkConfig), -} - -#[derive(Clone, Copy, Debug, Default)] -pub struct StatsdDefaultBatchSettings; - -impl SinkBatchSettings for StatsdDefaultBatchSettings { - const MAX_EVENTS: Option = Some(1000); - const MAX_BYTES: Option = Some(1300); - const TIMEOUT_SECS: f64 = 1.0; -} - -/// UDP configuration. -#[configurable_component] -#[derive(Clone, Debug)] -pub struct StatsdUdpConfig { - #[serde(flatten)] - pub udp: UdpSinkConfig, - - #[configurable(derived)] - #[serde(default)] - pub batch: BatchConfig, -} - -fn default_address() -> SocketAddr { - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8125) -} - -impl GenerateConfig for StatsdSinkConfig { - fn generate_config() -> toml::Value { - toml::Value::try_from(Self { - default_namespace: None, - mode: Mode::Udp(StatsdUdpConfig { - batch: Default::default(), - udp: UdpSinkConfig::from_address(default_address().to_string()), - }), - acknowledgements: Default::default(), - }) - .unwrap() - } -} - -#[async_trait::async_trait] -impl SinkConfig for StatsdSinkConfig { - async fn build( - &self, - _cx: SinkContext, - ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let default_namespace = self.default_namespace.clone(); - let mut encoder = StatsdEncoder { default_namespace }; - match &self.mode { - Mode::Tcp(config) => config.build(Default::default(), encoder), - Mode::Udp(config) => { - // 1432 bytes is a recommended packet size to fit into MTU - // https://github.com/statsd/statsd/blob/master/docs/metric_types.md#multi-metric-packets - // However we need to leave some space for +1 extra trailing event in the buffer. - // Also one might keep an eye on server side limitations, like - // mentioned here https://github.com/DataDog/dd-agent/issues/2638 - let batch = config.batch.into_batch_settings()?; - let (service, healthcheck) = config.udp.build_service()?; - let service = StatsdSvc { inner: service }; - let sink = BatchSink::new( - ServiceBuilder::new().service(service), - Buffer::new(batch.size, Compression::None), - batch.timeout, - ) - .sink_map_err(|error| error!(message = "Fatal statsd sink error.", %error)) - .with_flat_map(move |event: Event| { - stream::iter({ - let byte_size = event.size_of(); - let mut bytes = BytesMut::new(); - - // Errors are handled by `Encoder`. - encoder - .encode(event, &mut bytes) - .map(|_| Ok(EncodedEvent::new(bytes, byte_size))) - }) - }); - - Ok((super::VectorSink::from_event_sink(sink), healthcheck)) - } - #[cfg(unix)] - Mode::Unix(config) => config.build(Default::default(), encoder), - } - } - - fn input(&self) -> Input { - Input::metric() - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -// Note that if multi-valued tags are present, this encoding may change the order from the input -// event, since the tags with multiple values may not have been grouped together. -// This is not an issue, but noting as it may be an observed behavior. -fn encode_tags(tags: &MetricTags) -> String { - let parts: Vec<_> = tags - .iter_all() - .map(|(name, tag_value)| match tag_value { - Some(value) => format!("{}:{}", name, value), - None => name.to_owned(), - }) - .collect(); - - // `parts` is already sorted by key because of BTreeMap - parts.join(",") -} - -fn push_event( - buf: &mut Vec, - metric: &Metric, - val: V, - metric_type: &str, - sample_rate: Option, -) { - buf.push(format!("{}:{}|{}", metric.name(), val, metric_type)); - - if let Some(sample_rate) = sample_rate { - if sample_rate != 1 { - buf.push(format!("@{}", 1.0 / f64::from(sample_rate))) - } - }; - - if let Some(t) = metric.tags() { - buf.push(format!("#{}", encode_tags(t))); - }; -} - -#[derive(Debug, Clone)] -struct StatsdEncoder { - default_namespace: Option, -} - -impl Encoder for StatsdEncoder { - type Error = codecs::encoding::Error; - - fn encode(&mut self, event: Event, bytes: &mut BytesMut) -> Result<(), Self::Error> { - let mut buf = Vec::new(); - - let metric = event.as_metric(); - match metric.value() { - MetricValue::Counter { value } => { - push_event(&mut buf, metric, value, "c", None); - } - MetricValue::Gauge { value } => { - match metric.kind() { - MetricKind::Incremental => { - push_event(&mut buf, metric, format!("{:+}", value), "g", None) - } - MetricKind::Absolute => push_event(&mut buf, metric, value, "g", None), - }; - } - MetricValue::Distribution { samples, statistic } => { - let metric_type = match statistic { - StatisticKind::Histogram => "h", - StatisticKind::Summary => "d", - }; - - // TODO: This would actually be good to potentially add a helper combinator for, in the same vein as - // `SinkBuilderExt::normalized`, that provides a metric "optimizer" for doing these sorts of things. We - // don't actually compress distributions as-is in other metrics sinks unless they use the old-style - // approach coupled with `MetricBuffer`. While not every sink would benefit from this -- the - // `datadog_metrics` sink always converts distributions to sketches anyways, for example -- a lot of - // them could. - // - // This would also imply rewriting this sink in the new style to take advantage of it. - let mut samples = samples.clone(); - let compressed_samples = compress_distribution(&mut samples); - let mut temp_buf = Vec::new(); - for sample in compressed_samples { - push_event( - &mut temp_buf, - metric, - sample.value, - metric_type, - Some(sample.rate), - ); - let msg = encode_namespace( - metric.namespace().or(self.default_namespace.as_deref()), - '.', - temp_buf.join("|"), - ); - buf.push(msg); - temp_buf.clear() - } - } - MetricValue::Set { values } => { - for val in values { - push_event(&mut buf, metric, val, "s", None); - } - } - _ => { - emit!(StatsdInvalidMetricError { - value: metric.value(), - kind: &metric.kind(), - }); - - return Ok(()); - } - }; - - // TODO: this properly encodes aggregate histograms, but it does not handle sketches. There - // are complications with applying this to sketches, as it is required to extract the - // buckets and unpack the sketch in order to get the real values for distribution samples. - // Tracked in #11661. - let msg: String = match metric.value() { - MetricValue::Distribution { .. } => buf.join("\n"), - _ => encode_namespace( - metric.namespace().or(self.default_namespace.as_deref()), - '.', - buf.join("|"), - ), - }; - - bytes.put_slice(&msg.into_bytes()); - bytes.put_u8(b'\n'); - - Ok(()) - } -} - -impl Service for StatsdSvc { - type Response = (); - type Error = crate::Error; - type Future = future::BoxFuture<'static, Result<(), Self::Error>>; - - // Emission of Error internal event is handled upstream by the caller - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx).map_err(Into::into) - } - - // Emission of Error internal event is handled upstream by the caller - fn call(&mut self, frame: BytesMut) -> Self::Future { - self.inner.call(frame).err_into().boxed() - } -} - -#[cfg(test)] -mod test { - use bytes::Bytes; - use futures::{channel::mpsc, StreamExt, TryStreamExt}; - use tokio::net::UdpSocket; - use tokio_util::{codec::BytesCodec, udp::UdpFramed}; - use vector_core::{event::metric::TagValue, metric_tags}; - #[cfg(feature = "sources-statsd")] - use {crate::sources::statsd::parser::parse, std::str::from_utf8}; - - use super::*; - use crate::{ - event::Metric, - test_util::{ - components::{assert_sink_compliance, SINK_TAGS}, - *, - }, - }; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - fn tags() -> MetricTags { - metric_tags!( - "normal_tag" => "value", - "multi_value" => "true", - "multi_value" => "false", - "multi_value" => TagValue::Bare, - "bare_tag" => TagValue::Bare, - ) - } - - #[test] - fn test_encode_tags() { - let actual = encode_tags(&tags()); - let mut actual = actual.split(',').collect::>(); - actual.sort(); - - let mut expected = - "bare_tag,normal_tag:value,multi_value:true,multi_value:false,multi_value" - .split(',') - .collect::>(); - expected.sort(); - - assert_eq!(actual, expected); - } - - #[test] - fn tags_order() { - assert_eq!( - &encode_tags( - &vec![ - ("a", "value"), - ("b", "value"), - ("c", "value"), - ("d", "value"), - ("e", "value"), - ] - .into_iter() - .map(|(k, v)| (k.to_owned(), v.to_owned())) - .collect() - ), - "a:value,b:value,c:value,d:value,e:value" - ); - } - - #[cfg(feature = "sources-statsd")] - #[test] - fn test_encode_counter() { - let metric1 = Metric::new( - "counter", - MetricKind::Incremental, - MetricValue::Counter { value: 1.5 }, - ) - .with_tags(Some(tags())); - let event = Event::Metric(metric1.clone()); - let mut encoder = StatsdEncoder { - default_namespace: None, - }; - let mut frame = BytesMut::new(); - encoder.encode(event, &mut frame).unwrap(); - let metric2 = parse(from_utf8(&frame).unwrap().trim()).unwrap(); - vector_common::assert_event_data_eq!(metric1, metric2); - } - - #[cfg(feature = "sources-statsd")] - #[test] - fn test_encode_absolute_counter() { - let metric1 = Metric::new( - "counter", - MetricKind::Absolute, - MetricValue::Counter { value: 1.5 }, - ); - let event = Event::Metric(metric1); - let mut encoder = StatsdEncoder { - default_namespace: None, - }; - let mut frame = BytesMut::new(); - encoder.encode(event, &mut frame).unwrap(); - // The statsd parser will parse the counter as Incremental, - // so we can't compare it with the parsed value. - assert_eq!("counter:1.5|c\n", from_utf8(&frame).unwrap()); - } - - #[cfg(feature = "sources-statsd")] - #[test] - fn test_encode_gauge() { - let metric1 = Metric::new( - "gauge", - MetricKind::Incremental, - MetricValue::Gauge { value: -1.5 }, - ) - .with_tags(Some(tags())); - let event = Event::Metric(metric1.clone()); - let mut encoder = StatsdEncoder { - default_namespace: None, - }; - let mut frame = BytesMut::new(); - encoder.encode(event, &mut frame).unwrap(); - let metric2 = parse(from_utf8(&frame).unwrap().trim()).unwrap(); - vector_common::assert_event_data_eq!(metric1, metric2); - } - - #[cfg(feature = "sources-statsd")] - #[test] - fn test_encode_absolute_gauge() { - let metric1 = Metric::new( - "gauge", - MetricKind::Absolute, - MetricValue::Gauge { value: 1.5 }, - ) - .with_tags(Some(tags())); - let event = Event::Metric(metric1.clone()); - let mut encoder = StatsdEncoder { - default_namespace: None, - }; - let mut frame = BytesMut::new(); - encoder.encode(event, &mut frame).unwrap(); - let metric2 = parse(from_utf8(&frame).unwrap().trim()).unwrap(); - vector_common::assert_event_data_eq!(metric1, metric2); - } - - #[cfg(feature = "sources-statsd")] - #[test] - fn test_encode_distribution() { - let metric1 = Metric::new( - "distribution", - MetricKind::Incremental, - MetricValue::Distribution { - samples: vector_core::samples![1.5 => 1, 1.5 => 1], - statistic: StatisticKind::Histogram, - }, - ) - .with_tags(Some(tags())); - - let metric1_compressed = Metric::new( - "distribution", - MetricKind::Incremental, - MetricValue::Distribution { - samples: vector_core::samples![1.5 => 2], - statistic: StatisticKind::Histogram, - }, - ) - .with_tags(Some(tags())); - - let event = Event::Metric(metric1); - let mut encoder = StatsdEncoder { - default_namespace: None, - }; - let mut frame = BytesMut::new(); - encoder.encode(event, &mut frame).unwrap(); - let metric2 = parse(from_utf8(&frame).unwrap().trim()).unwrap(); - vector_common::assert_event_data_eq!(metric1_compressed, metric2); - } - - #[cfg(feature = "sources-statsd")] - #[test] - fn test_encode_distribution_aggregated() { - let metric1 = Metric::new( - "distribution", - MetricKind::Incremental, - MetricValue::Distribution { - samples: vector_core::samples![2.5 => 1, 1.5 => 1, 1.5 => 1], - statistic: StatisticKind::Histogram, - }, - ) - .with_tags(Some(tags())); - - let metric1_part1_compressed = Metric::new( - "distribution", - MetricKind::Incremental, - MetricValue::Distribution { - samples: vector_core::samples![2.5 => 1], - statistic: StatisticKind::Histogram, - }, - ) - .with_tags(Some(tags())); - let metric1_part2_compressed = Metric::new( - "distribution", - MetricKind::Incremental, - MetricValue::Distribution { - samples: vector_core::samples![1.5 => 2], - statistic: StatisticKind::Histogram, - }, - ) - .with_tags(Some(tags())); - let event = Event::Metric(metric1); - let mut encoder = StatsdEncoder { - default_namespace: None, - }; - let mut frame = BytesMut::new(); - encoder.encode(event, &mut frame).unwrap(); - - let res = from_utf8(&frame).unwrap().trim(); - let mut packets = res.split('\n'); - - let metric2 = parse(packets.next().unwrap().trim()).unwrap(); - vector_common::assert_event_data_eq!(metric1_part2_compressed, metric2); - - let metric3 = parse(packets.next().unwrap().trim()).unwrap(); - vector_common::assert_event_data_eq!(metric1_part1_compressed, metric3); - } - - #[cfg(feature = "sources-statsd")] - #[test] - fn test_encode_set() { - let metric1 = Metric::new( - "set", - MetricKind::Incremental, - MetricValue::Set { - values: vec!["abc".to_owned()].into_iter().collect(), - }, - ) - .with_tags(Some(tags())); - let event = Event::Metric(metric1.clone()); - let mut encoder = StatsdEncoder { - default_namespace: None, - }; - let mut frame = BytesMut::new(); - encoder.encode(event, &mut frame).unwrap(); - let metric2 = parse(from_utf8(&frame).unwrap().trim()).unwrap(); - - vector_common::assert_event_data_eq!(metric1, metric2); - } - - #[tokio::test] - async fn test_send_to_statsd() { - trace_init(); - - let addr = next_addr(); - let mut batch = BatchConfig::default(); - batch.max_bytes = Some(512); - - let config = StatsdSinkConfig { - default_namespace: Some("ns".into()), - mode: Mode::Udp(StatsdUdpConfig { - batch, - udp: UdpSinkConfig::from_address(addr.to_string()), - }), - acknowledgements: Default::default(), - }; - - let events = vec![ - Event::Metric( - Metric::new( - "counter", - MetricKind::Incremental, - MetricValue::Counter { value: 1.5 }, - ) - .with_namespace(Some("vector")) - .with_tags(Some(tags())), - ), - Event::Metric( - Metric::new( - "histogram", - MetricKind::Incremental, - MetricValue::Distribution { - samples: vector_core::samples![2.0 => 100], - statistic: StatisticKind::Histogram, - }, - ) - .with_namespace(Some("vector")), - ), - ]; - let (mut tx, rx) = mpsc::channel(0); - - let context = SinkContext::new_test(); - assert_sink_compliance(&SINK_TAGS, async move { - let (sink, _healthcheck) = config.build(context).await.unwrap(); - - let socket = UdpSocket::bind(addr).await.unwrap(); - tokio::spawn(async move { - let mut stream = UdpFramed::new(socket, BytesCodec::new()) - .map_err(|error| error!(message = "Error reading line.", %error)) - .map_ok(|(bytes, _addr)| bytes.freeze()); - - while let Some(Ok(item)) = stream.next().await { - tx.send(item).await.unwrap(); - } - }); - - sink.run(stream::iter(events).map(Into::into)) - .await - .expect("Running sink failed") - }) - .await; - - let messages = collect_n(rx, 1).await; - assert_eq!( - messages[0], - Bytes::from("vector.counter:1.5|c|#bare_tag,multi_value:true,multi_value:false,multi_value,normal_tag:value\nvector.histogram:2|h|@0.01\n"), - ); - } -} diff --git a/src/sinks/statsd/batch.rs b/src/sinks/statsd/batch.rs new file mode 100644 index 0000000000000..23504e146ca46 --- /dev/null +++ b/src/sinks/statsd/batch.rs @@ -0,0 +1,27 @@ +use vector_core::{event::Metric, stream::batcher::limiter::ItemBatchSize}; + +// This accounts for the separators, the metric type string, the length of the value itself. It can +// never be too small, as the above values will always take at least 4 bytes. +const EST_OVERHEAD_LEN: usize = 4; + +#[derive(Default)] +pub(super) struct StatsdBatchSizer; + +impl ItemBatchSize for StatsdBatchSizer { + fn size(&self, item: &Metric) -> usize { + // Metric name. + item.series().name().name().len() + // Metric namespace, with an additional 1 to account for the namespace separator. + + item.series().name().namespace().map(|s| s.len() + 1).unwrap_or(0) + // Metric tags, with an additional 1 per tag to account for the tag key/value separator. + + item.series().tags().map(|t| { + t.iter_all().map(|(k, v)| { + k.len() + 1 + v.map(|v| v.len()).unwrap_or(0) + }) + .sum() + }) + .unwrap_or(0) + // Estimated overhead (separators, metric value, etc) + + EST_OVERHEAD_LEN + } +} diff --git a/src/sinks/statsd/config.rs b/src/sinks/statsd/config.rs new file mode 100644 index 0000000000000..5e1052d59a8bd --- /dev/null +++ b/src/sinks/statsd/config.rs @@ -0,0 +1,163 @@ +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use async_trait::async_trait; +use vector_common::internal_event::Protocol; +use vector_config::{component::GenerateConfig, configurable_component}; +use vector_core::{ + config::{AcknowledgementsConfig, Input}, + sink::VectorSink, +}; + +use crate::{ + config::{SinkConfig, SinkContext}, + internal_events::SocketMode, + sinks::{ + util::{ + service::net::{NetworkConnector, TcpConnectorConfig, UdpConnectorConfig}, + BatchConfig, SinkBatchSettings, + }, + Healthcheck, + }, +}; + +#[cfg(unix)] +use crate::sinks::util::service::net::UnixConnectorConfig; + +use super::{request_builder::StatsdRequestBuilder, service::StatsdService, sink::StatsdSink}; + +#[derive(Clone, Copy, Debug, Default)] +pub struct StatsdDefaultBatchSettings; + +impl SinkBatchSettings for StatsdDefaultBatchSettings { + const MAX_EVENTS: Option = Some(1000); + const MAX_BYTES: Option = Some(1300); + const TIMEOUT_SECS: f64 = 1.0; +} + +/// Configuration for the `statsd` sink. +#[configurable_component(sink("statsd"))] +#[derive(Clone, Debug)] +pub struct StatsdSinkConfig { + /// Sets the default namespace for any metrics sent. + /// + /// This namespace is only used if a metric has no existing namespace. When a namespace is + /// present, it is used as a prefix to the metric name, and separated with a period (`.`). + #[serde(alias = "namespace")] + #[configurable(metadata(docs::examples = "service"))] + pub default_namespace: Option, + + #[serde(flatten)] + pub mode: Mode, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +/// Socket mode. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(tag = "mode", rename_all = "snake_case")] +#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))] +pub enum Mode { + /// Send over TCP. + Tcp(TcpConnectorConfig), + + /// Send over UDP. + Udp(UdpConnectorConfig), + + /// Send over a Unix domain socket (UDS). + #[cfg(unix)] + Unix(UnixConnectorConfig), +} + +impl Mode { + const fn as_socket_mode(&self) -> SocketMode { + match self { + Self::Tcp(_) => SocketMode::Tcp, + Self::Udp(_) => SocketMode::Udp, + #[cfg(unix)] + Self::Unix(_) => SocketMode::Unix, + } + } + + fn as_connector(&self) -> NetworkConnector { + match self { + Self::Tcp(config) => config.as_connector(), + Self::Udp(config) => config.as_connector(), + #[cfg(unix)] + Self::Unix(config) => config.as_connector(), + } + } +} + +fn default_address() -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8125) +} + +impl GenerateConfig for StatsdSinkConfig { + fn generate_config() -> toml::Value { + let address = default_address(); + + toml::Value::try_from(Self { + default_namespace: None, + mode: Mode::Udp(UdpConnectorConfig::from_address( + address.ip().to_string(), + address.port(), + )), + batch: Default::default(), + acknowledgements: Default::default(), + }) + .unwrap() + } +} + +#[async_trait] +impl SinkConfig for StatsdSinkConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let batcher_settings = self.batch.into_batcher_settings()?; + + let socket_mode = self.mode.as_socket_mode(); + let request_builder = + StatsdRequestBuilder::new(self.default_namespace.clone(), socket_mode)?; + let protocol = Protocol::from(socket_mode.as_str()); + + let connector = self.mode.as_connector(); + let service = connector.service(); + let healthcheck = connector.healthcheck(); + + let sink = StatsdSink::new( + StatsdService::from_transport(service), + batcher_settings, + request_builder, + protocol, + ); + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::metric() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +#[cfg(test)] +mod test { + use super::StatsdSinkConfig; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } +} diff --git a/src/sinks/statsd/encoder.rs b/src/sinks/statsd/encoder.rs new file mode 100644 index 0000000000000..c75661cb97a30 --- /dev/null +++ b/src/sinks/statsd/encoder.rs @@ -0,0 +1,374 @@ +use std::{ + fmt::Display, + io::{self, Write}, +}; + +use bytes::{BufMut, BytesMut}; +use tokio_util::codec::Encoder; +use vector_core::event::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind}; + +use crate::{ + internal_events::StatsdInvalidMetricError, + sinks::util::{buffer::metrics::compress_distribution, encode_namespace}, +}; + +/// Error type for errors that can never happen, but for use with `Encoder`. +/// +/// For the StatsD encoder, the encoding operation is infallible. However, as `Encoder` requires +/// that the associated error type can be created by `From`, we can't simply use +/// `Infallible`. This type exists to bridge that gap, acting as a marker type for "we emit no +/// errors" while supporting the trait bounds on `Encoder::Error`. +#[derive(Debug)] +pub struct InfallibleIo; + +impl From for InfallibleIo { + fn from(_: io::Error) -> Self { + Self + } +} + +#[derive(Debug, Clone)] +pub(super) struct StatsdEncoder { + default_namespace: Option, +} + +impl StatsdEncoder { + /// Creates a new `StatsdEncoder` with the given default namespace, if any. + pub const fn new(default_namespace: Option) -> Self { + Self { default_namespace } + } +} + +impl<'a> Encoder<&'a Metric> for StatsdEncoder { + type Error = InfallibleIo; + + fn encode(&mut self, metric: &'a Metric, buf: &mut BytesMut) -> Result<(), Self::Error> { + let namespace = metric.namespace().or(self.default_namespace.as_deref()); + let name = encode_namespace(namespace, '.', metric.name()); + let tags = metric.tags().map(encode_tags); + + match metric.value() { + MetricValue::Counter { value } => { + encode_and_write_single_event(buf, &name, tags.as_deref(), value, "c", None); + } + MetricValue::Gauge { value } => { + match metric.kind() { + MetricKind::Incremental => encode_and_write_single_event( + buf, + &name, + tags.as_deref(), + format!("{:+}", value), + "g", + None, + ), + MetricKind::Absolute => { + encode_and_write_single_event(buf, &name, tags.as_deref(), value, "g", None) + } + }; + } + MetricValue::Distribution { samples, statistic } => { + let metric_type = match statistic { + StatisticKind::Histogram => "h", + StatisticKind::Summary => "d", + }; + + // TODO: This would actually be good to potentially add a helper combinator for, in the same vein as + // `SinkBuilderExt::normalized`, that provides a metric "optimizer" for doing these sorts of things. We + // don't actually compress distributions as-is in other metrics sinks unless they use the old-style + // approach coupled with `MetricBuffer`. While not every sink would benefit from this -- the + // `datadog_metrics` sink always converts distributions to sketches anyways, for example -- a lot of + // them could. + let mut samples = samples.clone(); + let compressed_samples = compress_distribution(&mut samples); + for sample in compressed_samples { + encode_and_write_single_event( + buf, + &name, + tags.as_deref(), + sample.value, + metric_type, + Some(sample.rate), + ); + } + } + MetricValue::Set { values } => { + for val in values { + encode_and_write_single_event(buf, &name, tags.as_deref(), val, "s", None); + } + } + _ => { + emit!(StatsdInvalidMetricError { + value: metric.value(), + kind: metric.kind(), + }); + + return Ok(()); + } + }; + + Ok(()) + } +} + +// Note that if multi-valued tags are present, this encoding may change the order from the input +// event, since the tags with multiple values may not have been grouped together. +// This is not an issue, but noting as it may be an observed behavior. +fn encode_tags(tags: &MetricTags) -> String { + let parts: Vec<_> = tags + .iter_all() + .map(|(name, tag_value)| match tag_value { + Some(value) => format!("{}:{}", name, value), + None => name.to_owned(), + }) + .collect(); + + // `parts` is already sorted by key because of BTreeMap + parts.join(",") +} + +fn encode_and_write_single_event( + buf: &mut BytesMut, + metric_name: &str, + metric_tags: Option<&str>, + val: V, + metric_type: &str, + sample_rate: Option, +) { + let mut writer = buf.writer(); + + write!(&mut writer, "{}:{}|{}", metric_name, val, metric_type).unwrap(); + + if let Some(sample_rate) = sample_rate { + if sample_rate != 1 { + write!(&mut writer, "|@{}", 1.0 / f64::from(sample_rate)).unwrap(); + } + }; + + if let Some(t) = metric_tags { + write!(&mut writer, "|#{}", t).unwrap(); + }; + + writeln!(&mut writer).unwrap(); +} + +#[cfg(test)] +mod tests { + use vector_core::{ + event::{metric::TagValue, MetricTags}, + metric_tags, + }; + + use super::encode_tags; + + #[cfg(feature = "sources-statsd")] + use vector_core::event::{Metric, MetricKind, MetricValue, StatisticKind}; + + #[cfg(feature = "sources-statsd")] + fn encode_metric(metric: &Metric) -> bytes::BytesMut { + use tokio_util::codec::Encoder; + + let mut encoder = super::StatsdEncoder { + default_namespace: None, + }; + let mut frame = bytes::BytesMut::new(); + encoder.encode(metric, &mut frame).unwrap(); + frame + } + + #[cfg(feature = "sources-statsd")] + fn parse_encoded_metrics(metric: &[u8]) -> Vec { + use crate::sources::statsd::parser::parse as statsd_parse; + + let s = std::str::from_utf8(metric).unwrap().trim(); + s.split('\n') + .map(|packet| statsd_parse(packet).expect("should not fail to parse statsd packet")) + .collect() + } + + fn tags() -> MetricTags { + metric_tags!( + "normal_tag" => "value", + "multi_value" => "true", + "multi_value" => "false", + "multi_value" => TagValue::Bare, + "bare_tag" => TagValue::Bare, + ) + } + + #[test] + fn test_encode_tags() { + let actual = encode_tags(&tags()); + let mut actual = actual.split(',').collect::>(); + actual.sort(); + + let mut expected = + "bare_tag,normal_tag:value,multi_value:true,multi_value:false,multi_value" + .split(',') + .collect::>(); + expected.sort(); + + assert_eq!(actual, expected); + } + + #[test] + fn tags_order() { + assert_eq!( + &encode_tags( + &vec![ + ("a", "value"), + ("b", "value"), + ("c", "value"), + ("d", "value"), + ("e", "value"), + ] + .into_iter() + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect() + ), + "a:value,b:value,c:value,d:value,e:value" + ); + } + + #[cfg(feature = "sources-statsd")] + #[test] + fn test_encode_counter() { + let input = Metric::new( + "counter", + MetricKind::Incremental, + MetricValue::Counter { value: 1.5 }, + ) + .with_tags(Some(tags())); + + let frame = encode_metric(&input); + let mut output = parse_encoded_metrics(&frame); + vector_common::assert_event_data_eq!(input, output.remove(0)); + } + + #[cfg(feature = "sources-statsd")] + #[test] + fn test_encode_absolute_counter() { + let input = Metric::new( + "counter", + MetricKind::Absolute, + MetricValue::Counter { value: 1.5 }, + ); + + let frame = encode_metric(&input); + // The statsd parser will parse the counter as Incremental, + // so we can't compare it with the parsed value. + assert_eq!("counter:1.5|c\n", std::str::from_utf8(&frame).unwrap()); + } + + #[cfg(feature = "sources-statsd")] + #[test] + fn test_encode_gauge() { + let input = Metric::new( + "gauge", + MetricKind::Incremental, + MetricValue::Gauge { value: -1.5 }, + ) + .with_tags(Some(tags())); + + let frame = encode_metric(&input); + let mut output = parse_encoded_metrics(&frame); + vector_common::assert_event_data_eq!(input, output.remove(0)); + } + + #[cfg(feature = "sources-statsd")] + #[test] + fn test_encode_absolute_gauge() { + let input = Metric::new( + "gauge", + MetricKind::Absolute, + MetricValue::Gauge { value: 1.5 }, + ) + .with_tags(Some(tags())); + + let frame = encode_metric(&input); + let mut output = parse_encoded_metrics(&frame); + vector_common::assert_event_data_eq!(input, output.remove(0)); + } + + #[cfg(feature = "sources-statsd")] + #[test] + fn test_encode_distribution() { + let input = Metric::new( + "distribution", + MetricKind::Incremental, + MetricValue::Distribution { + samples: vector_core::samples![1.5 => 1, 1.5 => 1], + statistic: StatisticKind::Histogram, + }, + ) + .with_tags(Some(tags())); + + let expected = Metric::new( + "distribution", + MetricKind::Incremental, + MetricValue::Distribution { + samples: vector_core::samples![1.5 => 2], + statistic: StatisticKind::Histogram, + }, + ) + .with_tags(Some(tags())); + + let frame = encode_metric(&input); + let mut output = parse_encoded_metrics(&frame); + vector_common::assert_event_data_eq!(expected, output.remove(0)); + } + + #[cfg(feature = "sources-statsd")] + #[test] + fn test_encode_distribution_aggregated() { + let input = Metric::new( + "distribution", + MetricKind::Incremental, + MetricValue::Distribution { + samples: vector_core::samples![2.5 => 1, 1.5 => 1, 1.5 => 1], + statistic: StatisticKind::Histogram, + }, + ) + .with_tags(Some(tags())); + + let expected1 = Metric::new( + "distribution", + MetricKind::Incremental, + MetricValue::Distribution { + samples: vector_core::samples![1.5 => 2], + statistic: StatisticKind::Histogram, + }, + ) + .with_tags(Some(tags())); + let expected2 = Metric::new( + "distribution", + MetricKind::Incremental, + MetricValue::Distribution { + samples: vector_core::samples![2.5 => 1], + statistic: StatisticKind::Histogram, + }, + ) + .with_tags(Some(tags())); + + let frame = encode_metric(&input); + let mut output = parse_encoded_metrics(&frame); + vector_common::assert_event_data_eq!(expected1, output.remove(0)); + vector_common::assert_event_data_eq!(expected2, output.remove(0)); + } + + #[cfg(feature = "sources-statsd")] + #[test] + fn test_encode_set() { + let input = Metric::new( + "set", + MetricKind::Incremental, + MetricValue::Set { + values: vec!["abc".to_owned()].into_iter().collect(), + }, + ) + .with_tags(Some(tags())); + + let frame = encode_metric(&input); + let mut output = parse_encoded_metrics(&frame); + vector_common::assert_event_data_eq!(input, output.remove(0)); + } +} diff --git a/src/sinks/statsd/mod.rs b/src/sinks/statsd/mod.rs new file mode 100644 index 0000000000000..73d43ff3cc163 --- /dev/null +++ b/src/sinks/statsd/mod.rs @@ -0,0 +1,12 @@ +mod batch; +mod config; +mod encoder; +mod normalizer; +mod request_builder; +mod service; +mod sink; + +#[cfg(test)] +mod tests; + +pub use self::config::StatsdSinkConfig; diff --git a/src/sinks/statsd/normalizer.rs b/src/sinks/statsd/normalizer.rs new file mode 100644 index 0000000000000..fc8a9656a636d --- /dev/null +++ b/src/sinks/statsd/normalizer.rs @@ -0,0 +1,454 @@ +use vector_core::event::{Metric, MetricValue}; + +use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; + +#[derive(Default)] +pub(crate) struct StatsdNormalizer; + +impl MetricNormalize for StatsdNormalizer { + fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option { + // We primarily care about making sure that metrics are incremental, but for gauges, we can + // handle both incremental and absolute versions during encoding. + match metric.value() { + // Pass through gauges as-is. + MetricValue::Gauge { .. } => Some(metric), + // Otherwise, ensure that it's incremental. + _ => state.make_incremental(metric), + } + } +} + +#[cfg(test)] +mod tests { + use std::fmt; + + use vector_core::event::{ + metric::{Bucket, Sample}, + Metric, MetricKind, MetricValue, StatisticKind, + }; + + use super::StatsdNormalizer; + use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; + + fn buckets_from_samples(values: &[f64]) -> (Vec, f64, u64) { + // Generate buckets, and general statistics, for an input set of data. We only use this in + // tests, and so we have some semi-realistic buckets here, but mainly we use them for testing, + // not for most accurately/efficiently representing the input samples. + let bounds = &[ + 1.0, + 2.0, + 4.0, + 8.0, + 16.0, + 32.0, + 64.0, + 128.0, + 256.0, + 512.0, + 1024.0, + f64::INFINITY, + ]; + let mut buckets = bounds + .iter() + .map(|b| Bucket { + upper_limit: *b, + count: 0, + }) + .collect::>(); + + let mut sum = 0.0; + let mut count = 0; + for value in values { + for bucket in buckets.iter_mut() { + if *value <= bucket.upper_limit { + bucket.count += 1; + } + } + + sum += *value; + count += 1; + } + + (buckets, sum, count) + } + + fn generate_f64s(start: u16, end: u16) -> Vec { + assert!(start <= end); + let mut samples = Vec::new(); + for n in start..=end { + samples.push(f64::from(n)); + } + samples + } + + fn get_counter(value: f64, kind: MetricKind) -> Metric { + Metric::new("counter", kind, MetricValue::Counter { value }) + } + + fn get_gauge(value: f64, kind: MetricKind) -> Metric { + Metric::new("gauge", kind, MetricValue::Gauge { value }) + } + + fn get_set(values: S, kind: MetricKind) -> Metric + where + S: IntoIterator, + V: fmt::Display, + { + Metric::new( + "set", + kind, + MetricValue::Set { + values: values.into_iter().map(|i| i.to_string()).collect(), + }, + ) + } + + fn get_distribution(samples: S, kind: MetricKind) -> Metric + where + S: IntoIterator, + V: Into, + { + Metric::new( + "distribution", + kind, + MetricValue::Distribution { + samples: samples + .into_iter() + .map(|n| Sample { + value: n.into(), + rate: 1, + }) + .collect(), + statistic: StatisticKind::Histogram, + }, + ) + } + + fn get_aggregated_histogram(samples: S, kind: MetricKind) -> Metric + where + S: IntoIterator, + V: Into, + { + let samples = samples.into_iter().map(Into::into).collect::>(); + let (buckets, sum, count) = buckets_from_samples(&samples); + + Metric::new( + "agg_histogram", + kind, + MetricValue::AggregatedHistogram { + buckets, + count, + sum, + }, + ) + } + + fn run_comparisons(inputs: Vec, expected_outputs: Vec>) { + let mut metric_set = MetricSet::default(); + let mut normalizer = StatsdNormalizer::default(); + + for (input, expected) in inputs.into_iter().zip(expected_outputs) { + let result = normalizer.normalize(&mut metric_set, input); + assert_eq!(result, expected); + } + } + + #[test] + fn absolute_counter() { + let first_value = 3.14; + let second_value = 8.675309; + + let counters = vec![ + get_counter(first_value, MetricKind::Absolute), + get_counter(second_value, MetricKind::Absolute), + ]; + + let expected_counters = vec![ + None, + Some(get_counter( + second_value - first_value, + MetricKind::Incremental, + )), + ]; + + run_comparisons(counters, expected_counters); + } + + #[test] + fn incremental_counter() { + let first_value = 3.14; + let second_value = 8.675309; + + let counters = vec![ + get_counter(first_value, MetricKind::Incremental), + get_counter(second_value, MetricKind::Incremental), + ]; + + let expected_counters = counters + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + run_comparisons(counters, expected_counters); + } + + #[test] + fn mixed_counter() { + let first_value = 3.14; + let second_value = 8.675309; + let third_value = 16.19; + + let counters = vec![ + get_counter(first_value, MetricKind::Incremental), + get_counter(second_value, MetricKind::Absolute), + get_counter(third_value, MetricKind::Absolute), + get_counter(first_value, MetricKind::Absolute), + get_counter(second_value, MetricKind::Incremental), + get_counter(third_value, MetricKind::Incremental), + ]; + + let expected_counters = vec![ + Some(get_counter(first_value, MetricKind::Incremental)), + None, + Some(get_counter( + third_value - second_value, + MetricKind::Incremental, + )), + None, + Some(get_counter(second_value, MetricKind::Incremental)), + Some(get_counter(third_value, MetricKind::Incremental)), + ]; + + run_comparisons(counters, expected_counters); + } + + #[test] + fn absolute_gauge() { + let first_value = 3.14; + let second_value = 8.675309; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Absolute), + get_gauge(second_value, MetricKind::Absolute), + ]; + + let expected_gauges = gauges + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + run_comparisons(gauges, expected_gauges); + } + + #[test] + fn incremental_gauge() { + let first_value = 3.14; + let second_value = 8.675309; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Incremental), + get_gauge(second_value, MetricKind::Incremental), + ]; + + let expected_gauges = gauges + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + run_comparisons(gauges, expected_gauges); + } + + #[test] + fn mixed_gauge() { + let first_value = 3.14; + let second_value = 8.675309; + let third_value = 16.19; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Incremental), + get_gauge(second_value, MetricKind::Absolute), + get_gauge(third_value, MetricKind::Absolute), + get_gauge(first_value, MetricKind::Absolute), + get_gauge(second_value, MetricKind::Incremental), + get_gauge(third_value, MetricKind::Incremental), + ]; + + let expected_gauges = gauges + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + run_comparisons(gauges, expected_gauges); + } + + #[test] + fn absolute_set() { + let sets = vec![ + get_set(1..=20, MetricKind::Absolute), + get_set(15..=25, MetricKind::Absolute), + ]; + + let expected_sets = vec![None, Some(get_set(21..=25, MetricKind::Incremental))]; + + run_comparisons(sets, expected_sets); + } + + #[test] + fn incremental_set() { + let sets = vec![ + get_set(1..=20, MetricKind::Incremental), + get_set(15..=25, MetricKind::Incremental), + ]; + + let expected_sets = vec![ + Some(get_set(1..=20, MetricKind::Incremental)), + Some(get_set(15..=25, MetricKind::Incremental)), + ]; + + run_comparisons(sets, expected_sets); + } + + #[test] + fn mixed_set() { + let sets = vec![ + get_set(1..=20, MetricKind::Incremental), + get_set(10..=16, MetricKind::Absolute), + get_set(15..=25, MetricKind::Absolute), + get_set(1..5, MetricKind::Incremental), + get_set(3..=42, MetricKind::Incremental), + ]; + + let expected_sets = vec![ + Some(get_set(1..=20, MetricKind::Incremental)), + None, + Some(get_set(17..=25, MetricKind::Incremental)), + Some(get_set(1..5, MetricKind::Incremental)), + Some(get_set(3..=42, MetricKind::Incremental)), + ]; + + run_comparisons(sets, expected_sets); + } + + #[test] + fn absolute_distribution() { + let samples1 = generate_f64s(1, 100); + let samples2 = generate_f64s(1, 125); + let expected_samples = generate_f64s(101, 125); + + let distributions = vec![ + get_distribution(samples1, MetricKind::Absolute), + get_distribution(samples2, MetricKind::Absolute), + ]; + + let expected_distributions = vec![ + None, + Some(get_distribution(expected_samples, MetricKind::Incremental)), + ]; + + run_comparisons(distributions, expected_distributions); + } + + #[test] + fn incremental_distribution() { + let samples1 = generate_f64s(1, 100); + let samples2 = generate_f64s(75, 125); + + let distributions = vec![ + get_distribution(samples1, MetricKind::Incremental), + get_distribution(samples2, MetricKind::Incremental), + ]; + + let expected_distributions = distributions.iter().cloned().map(Some).collect(); + + run_comparisons(distributions, expected_distributions); + } + + #[test] + fn mixed_distribution() { + let samples1 = generate_f64s(1, 100); + let samples2 = generate_f64s(75, 125); + let samples3 = generate_f64s(75, 187); + let samples4 = generate_f64s(22, 45); + let samples5 = generate_f64s(1, 100); + + let distributions = vec![ + get_distribution(samples1, MetricKind::Incremental), + get_distribution(samples2, MetricKind::Absolute), + get_distribution(samples3, MetricKind::Absolute), + get_distribution(samples4, MetricKind::Incremental), + get_distribution(samples5, MetricKind::Incremental), + ]; + + let expected_distributions = vec![ + Some(distributions[0].clone()), + None, + Some(get_distribution( + generate_f64s(126, 187), + MetricKind::Incremental, + )), + Some(distributions[3].clone()), + Some(distributions[4].clone()), + ]; + + run_comparisons(distributions, expected_distributions); + } + + #[test] + fn absolute_aggregated_histogram() { + let samples1 = generate_f64s(1, 100); + let samples2 = generate_f64s(1, 125); + + let agg_histograms = vec![ + get_aggregated_histogram(samples1, MetricKind::Absolute), + get_aggregated_histogram(samples2, MetricKind::Absolute), + ]; + + let expected_agg_histograms = vec![]; + + run_comparisons(agg_histograms, expected_agg_histograms); + } + + #[test] + fn incremental_aggregated_histogram() { + let samples1 = generate_f64s(1, 100); + let samples2 = generate_f64s(1, 125); + + let agg_histograms = vec![ + get_aggregated_histogram(samples1, MetricKind::Incremental), + get_aggregated_histogram(samples2, MetricKind::Incremental), + ]; + + let expected_agg_histograms = agg_histograms + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + run_comparisons(agg_histograms, expected_agg_histograms); + } + + #[test] + fn mixed_aggregated_histogram() { + let samples1 = generate_f64s(1, 100); + let samples2 = generate_f64s(75, 125); + let samples3 = generate_f64s(75, 187); + let samples4 = generate_f64s(22, 45); + let samples5 = generate_f64s(1, 100); + + let agg_histograms = vec![ + get_aggregated_histogram(samples1, MetricKind::Incremental), + get_aggregated_histogram(samples2, MetricKind::Absolute), + get_aggregated_histogram(samples3, MetricKind::Absolute), + get_aggregated_histogram(samples4, MetricKind::Incremental), + get_aggregated_histogram(samples5, MetricKind::Incremental), + ]; + + let expected_agg_histograms = vec![]; + + run_comparisons(agg_histograms, expected_agg_histograms); + } +} diff --git a/src/sinks/statsd/request_builder.rs b/src/sinks/statsd/request_builder.rs new file mode 100644 index 0000000000000..9cfdf119a08d3 --- /dev/null +++ b/src/sinks/statsd/request_builder.rs @@ -0,0 +1,155 @@ +use std::convert::Infallible; + +use bytes::BytesMut; +use snafu::Snafu; +use tokio_util::codec::Encoder; +use vector_common::request_metadata::RequestMetadata; +use vector_core::event::{EventFinalizers, Finalizable, Metric}; + +use super::{encoder::StatsdEncoder, service::StatsdRequest}; +use crate::{ + internal_events::SocketMode, + sinks::util::{ + metadata::RequestMetadataBuilder, request_builder::EncodeResult, IncrementalRequestBuilder, + }, +}; + +#[derive(Debug, Snafu)] +pub enum RequestBuilderError { + #[snafu(display("Failed to build the request builder: {}", reason))] + FailedToBuild { reason: &'static str }, +} + +/// Incremental request builder specific to StatsD. +pub struct StatsdRequestBuilder { + encoder: StatsdEncoder, + request_max_size: usize, + encode_buf: BytesMut, +} + +impl StatsdRequestBuilder { + pub fn new( + default_namespace: Option, + socket_mode: SocketMode, + ) -> Result { + let encoder = StatsdEncoder::new(default_namespace); + let request_max_size = match socket_mode { + // Following the recommended advice [1], we use a datagram size that should reasonably + // fit within the MTU of the common places that Vector will run: virtual cloud networks, + // regular ol' Ethernet networks, and so on. + // + // [1]: https://github.com/statsd/statsd/blob/0de340f864/docs/metric_types.md?plain=1#L121 + SocketMode::Udp => 1432, + + // Since messages can be much bigger with TCP and Unix domain sockets, we'll give + // ourselves the chance to build bigger requests which should increase I/O efficiency. + SocketMode::Tcp | SocketMode::Unix => 8192, + }; + + Ok(Self::from_encoder_and_max_size(encoder, request_max_size)) + } + + fn from_encoder_and_max_size(encoder: StatsdEncoder, request_max_size: usize) -> Self { + Self { + encoder, + request_max_size, + encode_buf: BytesMut::with_capacity(8192), + } + } +} + +impl Clone for StatsdRequestBuilder { + fn clone(&self) -> Self { + Self::from_encoder_and_max_size(self.encoder.clone(), self.request_max_size) + } +} + +impl IncrementalRequestBuilder> for StatsdRequestBuilder { + type Metadata = (EventFinalizers, RequestMetadata); + type Payload = Vec; + type Request = StatsdRequest; + type Error = Infallible; + + fn encode_events_incremental( + &mut self, + mut input: Vec, + ) -> Vec> { + let mut results = Vec::new(); + let mut pending = None; + + let mut metrics = input.drain(..); + while metrics.len() != 0 || pending.is_some() { + let mut n = 0; + + let mut request_buf = Vec::new(); + let mut finalizers = EventFinalizers::default(); + let mut request_metadata_builder = RequestMetadataBuilder::default(); + + loop { + // Grab the previously pending metric, or the next metric from the drain. + let (mut metric, was_encoded) = match pending.take() { + Some(metric) => (metric, true), + None => match metrics.next() { + Some(metric) => (metric, false), + None => break, + }, + }; + + // Encode the metric. Once we've done that, see if it can fit into the request + // buffer without exceeding the maximum request size limit. + // + // If it doesn't fit, we'll store this metric off to the side and break out of this + // loop, which will finalize the current request payload and store it in the vector of + // all generated requests. Otherwise, we'll merge it in and continue encoding. + // + // Crucially, we only break out if the current request payload already has data in + // it, as we need to be able to stick at least one encoded metric into each request. + if !was_encoded { + self.encode_buf.clear(); + self.encoder + .encode(&metric, &mut self.encode_buf) + .expect("encoding is infallible"); + } + + let request_buf_len = request_buf.len(); + if request_buf_len != 0 + && (request_buf_len + self.encode_buf.len() > self.request_max_size) + { + // The metric, as encoded, would cause us to exceed our maximum request size, so + // store it off to the side and finalize the current request. + pending = Some(metric); + break; + } + + // Merge the encoded metric into the request buffer and take over its event + // finalizers, etc. + request_buf.extend(&self.encode_buf[..]); + finalizers.merge(metric.take_finalizers()); + request_metadata_builder.track_event(metric); + n += 1; + } + + // If we encoded one or more metrics this pass, finalize the request. + if n > 0 { + let encode_result = EncodeResult::uncompressed(request_buf); + let request_metadata = request_metadata_builder.build(&encode_result); + + results.push(Ok(( + (finalizers, request_metadata), + encode_result.into_payload(), + ))); + } + } + + results + } + + fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request { + let (finalizers, metadata) = metadata; + StatsdRequest { + payload, + finalizers, + metadata, + } + } +} diff --git a/src/sinks/statsd/service.rs b/src/sinks/statsd/service.rs new file mode 100644 index 0000000000000..4d3c3bc78dd09 --- /dev/null +++ b/src/sinks/statsd/service.rs @@ -0,0 +1,106 @@ +use std::task::{Context, Poll}; + +use futures_util::future::BoxFuture; +use tower::Service; +use vector_common::{ + finalization::{EventFinalizers, EventStatus, Finalizable}, + internal_event::CountByteSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; +use vector_core::stream::DriverResponse; + +/// Generalized request for sending metrics to a StatsD endpoint. +#[derive(Clone, Debug)] +pub struct StatsdRequest { + pub payload: Vec, + pub finalizers: EventFinalizers, + pub metadata: RequestMetadata, +} + +impl Finalizable for StatsdRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl MetaDescriptive for StatsdRequest { + fn get_metadata(&self) -> RequestMetadata { + self.metadata + } +} + +// Placeholder response to shuttle request metadata for StatsD requests. +// +// As StatsD sends no response back to a caller, there's no success/failure to report except for raw +// I/O errors when sending the request. Primarily, this type shuttles the metadata around the +// request -- events sent, bytes sent, etc -- that is required by `Driver`. +#[derive(Debug)] +pub struct StatsdResponse { + metadata: RequestMetadata, +} + +impl DriverResponse for StatsdResponse { + fn event_status(&self) -> EventStatus { + // If we generated a response, that implies our send concluded without any I/O errors, so we + // assume things were delivered. + EventStatus::Delivered + } + + fn events_sent(&self) -> CountByteSize { + CountByteSize( + self.metadata.event_count(), + self.metadata.events_byte_size(), + ) + } + + fn bytes_sent(&self) -> Option { + Some(self.metadata.request_wire_size()) + } +} + +#[derive(Clone)] +pub struct StatsdService { + transport: T, +} + +impl StatsdService { + /// Creates a new `StatsdService` with the given `transport` service. + /// + /// The `transport` service is responsible for sending the actual encoded requests to the downstream + /// endpoint. + pub const fn from_transport(transport: T) -> Self { + Self { transport } + } +} + +impl Service for StatsdService +where + T: Service>, + T::Error: Into>, + T::Future: Send + 'static, +{ + type Response = StatsdResponse; + type Error = Box; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + self.transport.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, request: StatsdRequest) -> Self::Future { + let StatsdRequest { + payload, + finalizers: _, + metadata, + } = request; + + let send_future = self.transport.call(payload); + + Box::pin(async move { + send_future + .await + .map(|_| StatsdResponse { metadata }) + .map_err(Into::into) + }) + } +} diff --git a/src/sinks/statsd/sink.rs b/src/sinks/statsd/sink.rs new file mode 100644 index 0000000000000..b17097147bed2 --- /dev/null +++ b/src/sinks/statsd/sink.rs @@ -0,0 +1,98 @@ +use std::{fmt, future::ready}; + +use async_trait::async_trait; +use futures_util::{ + stream::{self, BoxStream}, + StreamExt, +}; +use tower::Service; +use vector_common::internal_event::Protocol; +use vector_core::{ + event::Event, + sink::StreamSink, + stream::{BatcherSettings, DriverResponse}, +}; + +use crate::sinks::util::SinkBuilderExt; + +use super::{ + batch::StatsdBatchSizer, normalizer::StatsdNormalizer, request_builder::StatsdRequestBuilder, + service::StatsdRequest, +}; + +pub(crate) struct StatsdSink { + service: S, + batch_settings: BatcherSettings, + request_builder: StatsdRequestBuilder, + protocol: Protocol, +} + +impl StatsdSink +where + S: Service + Send, + S::Error: fmt::Debug + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse, +{ + /// Creates a new `StatsdSink`. + pub const fn new( + service: S, + batch_settings: BatcherSettings, + request_builder: StatsdRequestBuilder, + protocol: Protocol, + ) -> Self { + Self { + service, + batch_settings, + request_builder, + protocol, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + // Convert `Event` to `Metric` so we don't have to deal with constant conversions. + .filter_map(|event| ready(event.try_into_metric())) + // Converts absolute counters into incremental counters, but otherwise leaves everything + // else alone. The encoder will handle the difference in absolute vs incremental for + // other metric types in type-specific ways i.e. incremental gauge updates use a + // different syntax, etc. + .normalized_with_default::() + .batched( + self.batch_settings + .into_item_size_config(StatsdBatchSizer::default()), + ) + // We build our requests "incrementally", which means that for a single batch of + // metrics, we might generate N requests to represent all of the metrics in the batch. + // + // We do this as for different socket modes, there are optimal request sizes to use to + // ensure the highest rate of delivery, such as staying within the MTU for UDP, etc. + .incremental_request_builder(self.request_builder) + // This unrolls the vector of request results that our request builder generates. + .flat_map(stream::iter) + // Generating requests _cannot_ fail, so we just unwrap our built requests. + .unwrap_infallible() + // Finally, we generate the driver which will take our requests, send them off, and appropriately handle + // finalization of the events, and logging/metrics, as the requests are responded to. + .into_driver(self.service) + .protocol(self.protocol) + .run() + .await + } +} + +#[async_trait] +impl StreamSink for StatsdSink +where + S: Service + Send, + S::Error: fmt::Debug + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse, +{ + async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + // Rust has issues with lifetimes and generics, which `async_trait` exacerbates, so we write + // a normal async fn in `StatsdSink` itself, and then call out to it from this trait + // implementation, which makes the compiler happy. + self.run_inner(input).await + } +} diff --git a/src/sinks/statsd/tests.rs b/src/sinks/statsd/tests.rs new file mode 100644 index 0000000000000..4c30a46b78551 --- /dev/null +++ b/src/sinks/statsd/tests.rs @@ -0,0 +1,100 @@ +use bytes::Bytes; +use futures::{StreamExt, TryStreamExt}; +use futures_util::stream; +use tokio::{net::UdpSocket, sync::mpsc}; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::{codec::BytesCodec, udp::UdpFramed}; +use vector_core::{ + event::{metric::TagValue, Event, Metric, MetricKind, MetricTags, MetricValue, StatisticKind}, + metric_tags, +}; + +use crate::{ + config::{SinkConfig, SinkContext}, + sinks::{statsd::config::Mode, util::service::net::UdpConnectorConfig}, + test_util::{ + collect_n, + components::{assert_sink_compliance, SINK_TAGS}, + next_addr, trace_init, + }, +}; + +use super::StatsdSinkConfig; + +fn tags() -> MetricTags { + metric_tags!( + "normal_tag" => "value", + "multi_value" => "true", + "multi_value" => "false", + "multi_value" => TagValue::Bare, + "bare_tag" => TagValue::Bare, + ) +} + +#[tokio::test] +async fn test_send_to_statsd() { + trace_init(); + + let addr = next_addr(); + + let config = StatsdSinkConfig { + default_namespace: Some("ns".into()), + mode: Mode::Udp(UdpConnectorConfig::from_address( + addr.ip().to_string(), + addr.port(), + )), + batch: Default::default(), + acknowledgements: Default::default(), + }; + + let events = vec![ + Event::Metric( + Metric::new( + "counter", + MetricKind::Incremental, + MetricValue::Counter { value: 1.5 }, + ) + .with_namespace(Some("vector")) + .with_tags(Some(tags())), + ), + Event::Metric( + Metric::new( + "histogram", + MetricKind::Incremental, + MetricValue::Distribution { + samples: vector_core::samples![2.0 => 100], + statistic: StatisticKind::Histogram, + }, + ) + .with_namespace(Some("vector")), + ), + ]; + let (tx, rx) = mpsc::channel(1); + + let context = SinkContext::new_test(); + assert_sink_compliance(&SINK_TAGS, async move { + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let socket = UdpSocket::bind(addr).await.unwrap(); + tokio::spawn(async move { + let mut stream = UdpFramed::new(socket, BytesCodec::new()) + .map_err(|error| error!(message = "Error reading line.", %error)) + .map_ok(|(bytes, _addr)| bytes.freeze()); + + while let Some(Ok(item)) = stream.next().await { + tx.send(item).await.unwrap(); + } + }); + + sink.run(stream::iter(events).map(Into::into)) + .await + .expect("Running sink failed") + }) + .await; + + let messages = collect_n(ReceiverStream::new(rx), 1).await; + assert_eq!( + messages[0], + Bytes::from("vector.counter:1.5|c|#bare_tag,multi_value:true,multi_value:false,multi_value,normal_tag:value\nvector.histogram:2|h|@0.01\n"), + ); +} diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index 4cbb05a6e5793..0d7af1635a4b7 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -1,6 +1,16 @@ -use std::{fmt, future::Future, hash::Hash, num::NonZeroUsize, pin::Pin, sync::Arc}; +use std::{ + convert::Infallible, + fmt, + future::Future, + hash::Hash, + num::NonZeroUsize, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; use futures_util::{stream::Map, Stream, StreamExt}; +use pin_project::pin_project; use tower::Service; use vector_core::{ event::{Finalizable, Metric}, @@ -20,6 +30,16 @@ use super::{ impl SinkBuilderExt for T where T: Stream {} pub trait SinkBuilderExt: Stream { + /// Converts a stream of infallible results by unwrapping them. + /// + /// For a stream of `Result` items, this turns it into a stream of `T` items. + fn unwrap_infallible(self) -> UnwrapInfallible + where + Self: Stream> + Sized, + { + UnwrapInfallible { st: self } + } + /// Batches the stream based on the given partitioner and batch settings. /// /// The stream will yield batches of events, with their partition key, when either a batch fills @@ -210,3 +230,23 @@ pub trait SinkBuilderExt: Stream { Driver::new(self, service) } } + +#[pin_project] +pub struct UnwrapInfallible { + #[pin] + st: St, +} + +impl Stream for UnwrapInfallible +where + St: Stream>, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.st + .poll_next(cx) + .map(|maybe| maybe.map(|result| result.unwrap())) + } +} diff --git a/src/sinks/util/metadata.rs b/src/sinks/util/metadata.rs index 54c9346bb7895..975959e56aa15 100644 --- a/src/sinks/util/metadata.rs +++ b/src/sinks/util/metadata.rs @@ -7,7 +7,7 @@ use vector_common::request_metadata::RequestMetadata; use super::request_builder::EncodeResult; -#[derive(Default, Clone)] +#[derive(Clone, Default)] pub struct RequestMetadataBuilder { event_count: usize, events_byte_size: usize, @@ -38,9 +38,13 @@ impl RequestMetadataBuilder { } } - pub fn increment(&mut self, event_count: usize, events_byte_size: usize) { - self.event_count += event_count; - self.events_byte_size += events_byte_size; + pub fn track_event(&mut self, event: E) + where + E: ByteSizeOf + EstimatedJsonEncodedSizeOf, + { + self.event_count += 1; + self.events_byte_size += event.size_of(); + self.events_estimated_json_encoded_byte_size += event.estimated_json_encoded_size_of(); } pub fn with_request_size(&self, size: NonZeroUsize) -> RequestMetadata { diff --git a/src/sinks/util/service.rs b/src/sinks/util/service.rs index db7b602d2df89..2872c4eb39f6d 100644 --- a/src/sinks/util/service.rs +++ b/src/sinks/util/service.rs @@ -35,6 +35,7 @@ use crate::{ mod concurrency; mod health; mod map; +pub mod net; pub type Svc = RateLimit, Timeout>, L>>; pub type TowerBatchedSink = BatchSink, B>; diff --git a/src/sinks/util/service/net/mod.rs b/src/sinks/util/service/net/mod.rs new file mode 100644 index 0000000000000..bf571f46100f0 --- /dev/null +++ b/src/sinks/util/service/net/mod.rs @@ -0,0 +1,367 @@ +mod tcp; +mod udp; + +#[cfg(unix)] +mod unix; + +use std::{ + io, + net::SocketAddr, + task::{ready, Context, Poll}, + time::Duration, +}; + +#[cfg(unix)] +use std::path::PathBuf; + +use crate::{ + internal_events::{ + SocketOutgoingConnectionError, TcpSocketConnectionEstablished, UdpSendIncompleteError, + }, + sinks::{util::retries::ExponentialBackoff, Healthcheck}, +}; + +#[cfg(unix)] +use crate::internal_events::{UnixSendIncompleteError, UnixSocketConnectionEstablished}; + +pub use self::tcp::TcpConnectorConfig; +pub use self::udp::UdpConnectorConfig; + +#[cfg(unix)] +pub use self::unix::{UnixConnectorConfig, UnixMode}; + +use self::tcp::TcpConnector; +use self::udp::UdpConnector; +#[cfg(unix)] +use self::unix::{UnixConnector, UnixEither}; + +use futures_util::{future::BoxFuture, FutureExt}; +use snafu::{ResultExt, Snafu}; +use tokio::{ + io::AsyncWriteExt, + net::{TcpStream, UdpSocket}, + sync::oneshot, + time::sleep, +}; +use tower::Service; +use vector_config::configurable_component; +use vector_core::tls::{MaybeTlsStream, TlsError}; + +/// Hostname and port tuple. +/// +/// Both IP addresses and hostnames/fully qualified domain names (FQDNs) are accepted formats. +/// +/// The address _must_ include a port. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(try_from = "String", into = "String")] +#[configurable(title = "The address to connect to.")] +#[configurable(metadata(docs::examples = "92.12.333.224:5000"))] +#[configurable(metadata(docs::examples = "somehost:5000"))] +struct HostAndPort { + /// Hostname. + host: String, + + /// Port. + port: u16, +} + +impl TryFrom for HostAndPort { + type Error = String; + + fn try_from(value: String) -> Result { + let uri = value.parse::().map_err(|e| e.to_string())?; + let host = uri + .host() + .ok_or_else(|| "missing host".to_string())? + .to_string(); + let port = uri.port_u16().ok_or_else(|| "missing port".to_string())?; + + Ok(Self { host, port }) + } +} + +impl From for String { + fn from(value: HostAndPort) -> Self { + format!("{}:{}", value.host, value.port) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module, context(suffix(false)), visibility(pub))] +pub enum NetError { + #[snafu(display("Address is invalid: {}", reason))] + InvalidAddress { reason: String }, + + #[snafu(display("Failed to resolve address: {}", source))] + FailedToResolve { source: crate::dns::DnsError }, + + #[snafu(display("No addresses returned."))] + NoAddresses, + + #[snafu(display("Failed to configure socket: {}.", source))] + FailedToConfigure { source: std::io::Error }, + + #[snafu(display("Failed to configure TLS: {}.", source))] + FailedToConfigureTLS { source: TlsError }, + + #[snafu(display("Failed to bind socket: {}.", source))] + FailedToBind { source: std::io::Error }, + + #[snafu(display("Failed to send message: {}", source))] + FailedToSend { source: std::io::Error }, + + #[snafu(display("Failed to connect to endpoint: {}", source))] + FailedToConnect { source: std::io::Error }, + + #[snafu(display("Failed to connect to TLS endpoint: {}", source))] + FailedToConnectTLS { source: TlsError }, + + #[snafu(display("Failed to get socket back after send as channel closed unexpectedly."))] + ServiceSocketChannelClosed, +} + +enum NetworkServiceState { + /// The service is currently disconnected. + Disconnected, + + /// The service is currently attempting to connect to the endpoint. + Connecting(BoxFuture<'static, NetworkConnection>), + + /// The service is connected and idle. + Connected(NetworkConnection), + + /// The service has an in-flight send to the socket. + /// + /// If the socket experiences an unrecoverable error during the send, `None` will be returned + /// over the channel to signal the need to establish a new connection rather than reusing the + /// existing connection. + Sending(oneshot::Receiver>), +} + +enum NetworkConnection { + Tcp(MaybeTlsStream), + Udp(UdpSocket), + #[cfg(unix)] + Unix(UnixEither), +} + +impl NetworkConnection { + fn on_partial_send(&self, data_size: usize, sent: usize) { + match self { + // Can't "successfully" partially send with TCP: it either all eventually sends or the + // socket has an I/O error that kills the connection entirely. + Self::Tcp(_) => {} + Self::Udp(_) => { + emit!(UdpSendIncompleteError { data_size, sent }); + } + #[cfg(unix)] + Self::Unix(_) => { + emit!(UnixSendIncompleteError { data_size, sent }); + } + } + } + + async fn send(&mut self, buf: &[u8]) -> io::Result { + match self { + Self::Tcp(stream) => stream.write_all(buf).await.map(|()| buf.len()), + Self::Udp(socket) => socket.send(buf).await, + #[cfg(unix)] + Self::Unix(socket) => socket.send(buf).await, + } + } +} + +enum ConnectionMetadata { + Tcp { + peer_addr: SocketAddr, + }, + #[cfg(unix)] + Unix { + path: PathBuf, + }, +} + +#[derive(Clone)] +enum ConnectorType { + Tcp(TcpConnector), + Udp(UdpConnector), + #[cfg(unix)] + Unix(UnixConnector), +} + +/// A connector for generically connecting to a remote network endpoint. +/// +/// The connection can be based on TCP, UDP, or Unix Domain Sockets. +#[derive(Clone)] +pub struct NetworkConnector { + inner: ConnectorType, +} + +impl NetworkConnector { + fn on_connected(&self, metadata: ConnectionMetadata) { + match metadata { + ConnectionMetadata::Tcp { peer_addr } => { + emit!(TcpSocketConnectionEstablished { + peer_addr: Some(peer_addr) + }); + } + #[cfg(unix)] + ConnectionMetadata::Unix { path } => { + emit!(UnixSocketConnectionEstablished { path: &path }); + } + } + } + + fn on_connection_error(&self, error: E) { + emit!(SocketOutgoingConnectionError { error }); + } + + async fn connect(&self) -> Result<(NetworkConnection, Option), NetError> { + match &self.inner { + ConnectorType::Tcp(connector) => { + let (peer_addr, stream) = connector.connect().await?; + + Ok(( + NetworkConnection::Tcp(stream), + Some(ConnectionMetadata::Tcp { peer_addr }), + )) + } + ConnectorType::Udp(connector) => { + let socket = connector.connect().await?; + + Ok((NetworkConnection::Udp(socket), None)) + } + #[cfg(unix)] + ConnectorType::Unix(connector) => { + let (path, socket) = connector.connect().await?; + + Ok(( + NetworkConnection::Unix(socket), + Some(ConnectionMetadata::Unix { path }), + )) + } + } + } + + async fn connect_backoff(&self) -> NetworkConnection { + // TODO: Make this configurable. + let mut backoff = ExponentialBackoff::from_millis(2) + .factor(250) + .max_delay(Duration::from_secs(60)); + + loop { + match self.connect().await { + Ok((connection, maybe_metadata)) => { + if let Some(metadata) = maybe_metadata { + self.on_connected(metadata); + } + + return connection; + } + Err(error) => { + self.on_connection_error(error); + sleep(backoff.next().unwrap()).await; + } + } + } + } + + /// Gets a `Healthcheck` based on the configured destination of this connector. + pub fn healthcheck(&self) -> Healthcheck { + let connector = self.clone(); + Box::pin(async move { connector.connect().await.map(|_| ()).map_err(Into::into) }) + } + + /// Gets a `Service` suitable for sending data to the configured destination of this connector. + pub fn service(&self) -> NetworkService { + NetworkService::new(self.clone()) + } +} + +/// A `Service` implementation for generically sending bytes to a remote peer over a network connection. +/// +/// The connection can be based on TCP, UDP, or Unix Domain Sockets. +pub struct NetworkService { + connector: NetworkConnector, + state: NetworkServiceState, +} + +impl NetworkService { + const fn new(connector: NetworkConnector) -> Self { + Self { + connector, + state: NetworkServiceState::Disconnected, + } + } +} + +impl Service> for NetworkService { + type Response = usize; + type Error = NetError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + self.state = match &mut self.state { + NetworkServiceState::Disconnected => { + let connector = self.connector.clone(); + NetworkServiceState::Connecting(Box::pin(async move { + connector.connect_backoff().await + })) + } + NetworkServiceState::Connecting(fut) => { + let socket = ready!(fut.poll_unpin(cx)); + NetworkServiceState::Connected(socket) + } + NetworkServiceState::Connected(_) => break, + NetworkServiceState::Sending(fut) => { + match ready!(fut.poll_unpin(cx)) { + // When a send concludes, and there's an error, the request future sends + // back `None`. Otherwise, it'll send back `Some(...)` with the socket. + Ok(maybe_socket) => match maybe_socket { + Some(socket) => NetworkServiceState::Connected(socket), + None => NetworkServiceState::Disconnected, + }, + Err(_) => return Poll::Ready(Err(NetError::ServiceSocketChannelClosed)), + } + } + }; + } + Poll::Ready(Ok(())) + } + + fn call(&mut self, buf: Vec) -> Self::Future { + let (tx, rx) = oneshot::channel(); + + let mut socket = match std::mem::replace(&mut self.state, NetworkServiceState::Sending(rx)) + { + NetworkServiceState::Connected(socket) => socket, + _ => panic!("poll_ready must be called first"), + }; + + Box::pin(async move { + match socket.send(&buf).await.context(net_error::FailedToSend) { + Ok(sent) => { + // Emit an error if we weren't able to send the entire buffer. + if sent != buf.len() { + socket.on_partial_send(buf.len(), sent); + } + + // Send the socket back to the service, since theoretically it's still valid to + // reuse given that we may have simply overrun the OS socket buffers, etc. + let _ = tx.send(Some(socket)); + + Ok(sent) + } + Err(e) => { + // We need to signal back to the service that it needs to create a fresh socket + // since this one could be tainted. + let _ = tx.send(None); + + Err(e) + } + } + }) + } +} diff --git a/src/sinks/util/service/net/tcp.rs b/src/sinks/util/service/net/tcp.rs new file mode 100644 index 0000000000000..5cf97cb2ca1e5 --- /dev/null +++ b/src/sinks/util/service/net/tcp.rs @@ -0,0 +1,101 @@ +use std::net::SocketAddr; + +use snafu::ResultExt; +use tokio::net::TcpStream; + +use vector_config::configurable_component; +use vector_core::{ + tcp::TcpKeepaliveConfig, + tls::{MaybeTlsSettings, MaybeTlsStream, TlsEnableableConfig}, +}; + +use crate::dns; + +use super::{net_error::*, ConnectorType, HostAndPort, NetError, NetworkConnector}; + +/// TCP configuration. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct TcpConnectorConfig { + #[configurable(derived)] + address: HostAndPort, + + #[configurable(derived)] + keepalive: Option, + + /// The size of the socket's send buffer. + /// + /// If set, the value of the setting is passed via the `SO_SNDBUF` option. + #[configurable(metadata(docs::type_unit = "bytes"))] + #[configurable(metadata(docs::examples = 65536))] + send_buffer_size: Option, + + #[configurable(derived)] + tls: Option, +} + +impl TcpConnectorConfig { + pub const fn from_address(host: String, port: u16) -> Self { + Self { + address: HostAndPort { host, port }, + keepalive: None, + send_buffer_size: None, + tls: None, + } + } + + /// Creates a [`NetworkConnector`] from this TCP connector configuration. + pub fn as_connector(&self) -> NetworkConnector { + NetworkConnector { + inner: ConnectorType::Tcp(TcpConnector { + address: self.address.clone(), + keepalive: self.keepalive, + send_buffer_size: self.send_buffer_size, + tls: self.tls.clone(), + }), + } + } +} + +#[derive(Clone)] +pub(super) struct TcpConnector { + address: HostAndPort, + keepalive: Option, + send_buffer_size: Option, + tls: Option, +} + +impl TcpConnector { + pub(super) async fn connect( + &self, + ) -> Result<(SocketAddr, MaybeTlsStream), NetError> { + let ip = dns::Resolver + .lookup_ip(self.address.host.clone()) + .await + .context(FailedToResolve)? + .next() + .ok_or(NetError::NoAddresses)?; + + let addr = SocketAddr::new(ip, self.address.port); + + let tls = MaybeTlsSettings::from_config(&self.tls, false).context(FailedToConfigureTLS)?; + let mut stream = tls + .connect(self.address.host.as_str(), &addr) + .await + .context(FailedToConnectTLS)?; + + if let Some(send_buffer_size) = self.send_buffer_size { + if let Err(error) = stream.set_send_buffer_bytes(send_buffer_size) { + warn!(%error, "Failed configuring send buffer size on TCP socket."); + } + } + + if let Some(keepalive) = self.keepalive { + if let Err(error) = stream.set_keepalive(keepalive) { + warn!(%error, "Failed configuring keepalive on TCP socket."); + } + } + + Ok((addr, stream)) + } +} diff --git a/src/sinks/util/service/net/udp.rs b/src/sinks/util/service/net/udp.rs new file mode 100644 index 0000000000000..d2655a409b008 --- /dev/null +++ b/src/sinks/util/service/net/udp.rs @@ -0,0 +1,83 @@ +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + +use snafu::ResultExt; +use tokio::net::UdpSocket; + +use vector_config::configurable_component; + +use crate::{dns, net}; + +use super::{net_error::*, ConnectorType, HostAndPort, NetError, NetworkConnector}; + +/// UDP configuration. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct UdpConnectorConfig { + #[configurable(derived)] + address: HostAndPort, + + /// The size of the socket's send buffer. + /// + /// If set, the value of the setting is passed via the `SO_SNDBUF` option. + #[configurable(metadata(docs::type_unit = "bytes"))] + #[configurable(metadata(docs::examples = 65536))] + send_buffer_size: Option, +} + +impl UdpConnectorConfig { + pub const fn from_address(host: String, port: u16) -> Self { + Self { + address: HostAndPort { host, port }, + send_buffer_size: None, + } + } + + /// Creates a [`NetworkConnector`] from this UDP connector configuration. + pub fn as_connector(&self) -> NetworkConnector { + NetworkConnector { + inner: ConnectorType::Udp(UdpConnector { + address: self.address.clone(), + send_buffer_size: self.send_buffer_size, + }), + } + } +} + +#[derive(Clone)] +pub(super) struct UdpConnector { + address: HostAndPort, + send_buffer_size: Option, +} + +impl UdpConnector { + pub(super) async fn connect(&self) -> Result { + let ip = dns::Resolver + .lookup_ip(self.address.host.clone()) + .await + .context(FailedToResolve)? + .next() + .ok_or(NetError::NoAddresses)?; + + let addr = SocketAddr::new(ip, self.address.port); + let bind_address = find_bind_address(&addr); + + let socket = UdpSocket::bind(bind_address).await.context(FailedToBind)?; + + if let Some(send_buffer_size) = self.send_buffer_size { + if let Err(error) = net::set_send_buffer_size(&socket, send_buffer_size) { + warn!(%error, "Failed configuring send buffer size on UDP socket."); + } + } + + socket.connect(addr).await.context(FailedToConnect)?; + + Ok(socket) + } +} + +fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr { + match remote_addr { + SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), + SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), + } +} diff --git a/src/sinks/util/service/net/unix.rs b/src/sinks/util/service/net/unix.rs new file mode 100644 index 0000000000000..5bd1e4219d7a8 --- /dev/null +++ b/src/sinks/util/service/net/unix.rs @@ -0,0 +1,134 @@ +use std::{ + io, + os::fd::{AsFd, BorrowedFd}, + path::{Path, PathBuf}, +}; + +use snafu::ResultExt; +use tokio::{ + io::AsyncWriteExt, + net::{UnixDatagram, UnixStream}, +}; + +use vector_config::configurable_component; + +use crate::net; + +use super::{net_error::*, ConnectorType, NetError, NetworkConnector}; + +/// Unix socket modes. +#[configurable_component] +#[derive(Clone, Copy, Debug)] +pub enum UnixMode { + /// Datagram-oriented (`SOCK_DGRAM`). + Datagram, + + /// Stream-oriented (`SOCK_STREAM`). + Stream, +} + +/// Unix Domain Socket configuration. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct UnixConnectorConfig { + /// The Unix socket path. + /// + /// This should be an absolute path. + #[configurable(metadata(docs::examples = "/path/to/socket"))] + path: PathBuf, + + /// The Unix socket mode to use. + #[serde(default = "default_unix_mode")] + unix_mode: UnixMode, + + /// The size of the socket's send buffer. + /// + /// If set, the value of the setting is passed via the `SO_SNDBUF` option. + #[configurable(metadata(docs::type_unit = "bytes"))] + #[configurable(metadata(docs::examples = 65536))] + send_buffer_size: Option, +} + +const fn default_unix_mode() -> UnixMode { + UnixMode::Stream +} + +impl UnixConnectorConfig { + pub fn from_path>(path: P) -> Self { + Self { + path: path.as_ref().to_path_buf(), + unix_mode: UnixMode::Stream, + send_buffer_size: None, + } + } + + /// Creates a [`NetworkConnector`] from this Unix Domain Socket connector configuration. + pub fn as_connector(&self) -> NetworkConnector { + NetworkConnector { + inner: ConnectorType::Unix(UnixConnector { + path: self.path.clone(), + mode: self.unix_mode, + send_buffer_size: self.send_buffer_size, + }), + } + } +} + +pub(super) enum UnixEither { + Datagram(UnixDatagram), + Stream(UnixStream), +} + +impl UnixEither { + pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result { + match self { + Self::Datagram(datagram) => datagram.send(buf).await, + Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()), + } + } +} + +impl AsFd for UnixEither { + fn as_fd(&self) -> BorrowedFd<'_> { + match self { + Self::Datagram(datagram) => datagram.as_fd(), + Self::Stream(stream) => stream.as_fd(), + } + } +} + +#[derive(Clone)] +pub(super) struct UnixConnector { + path: PathBuf, + mode: UnixMode, + send_buffer_size: Option, +} + +impl UnixConnector { + pub(super) async fn connect(&self) -> Result<(PathBuf, UnixEither), NetError> { + let either_socket = match self.mode { + UnixMode::Datagram => { + UnixDatagram::unbound() + .context(FailedToBind) + .and_then(|datagram| { + datagram + .connect(&self.path) + .context(FailedToConnect) + .map(|_| UnixEither::Datagram(datagram)) + })? + } + UnixMode::Stream => UnixStream::connect(&self.path) + .await + .context(FailedToConnect) + .map(UnixEither::Stream)?, + }; + + if let Some(send_buffer_size) = self.send_buffer_size { + if let Err(error) = net::set_send_buffer_size(&either_socket, send_buffer_size) { + warn!(%error, "Failed configuring send buffer size on Unix socket."); + } + } + + Ok((self.path.clone(), either_socket)) + } +} diff --git a/src/sinks/util/udp.rs b/src/sinks/util/udp.rs index 4899a66b84959..890f2f10d0154 100644 --- a/src/sinks/util/udp.rs +++ b/src/sinks/util/udp.rs @@ -1,17 +1,15 @@ use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, pin::Pin, - task::{ready, Context, Poll}, time::Duration, }; use async_trait::async_trait; use bytes::BytesMut; -use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; +use futures::{stream::BoxStream, FutureExt, StreamExt}; use snafu::{ResultExt, Snafu}; -use tokio::{net::UdpSocket, sync::oneshot, time::sleep}; +use tokio::{net::UdpSocket, time::sleep}; use tokio_util::codec::Encoder; -use tower::Service; use vector_common::internal_event::{ ByteSize, BytesSent, InternalEventHandle, Protocol, Registered, }; @@ -27,27 +25,23 @@ use crate::{ SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError, UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError, }, + net, sinks::{ util::{retries::ExponentialBackoff, StreamSink}, Healthcheck, VectorSink, }, - udp, }; #[derive(Debug, Snafu)] pub enum UdpError { #[snafu(display("Failed to create UDP listener socket, error = {:?}.", source))] BindError { source: std::io::Error }, - #[snafu(display("Send error: {}", source))] - SendError { source: std::io::Error }, #[snafu(display("Connect error: {}", source))] ConnectError { source: std::io::Error }, #[snafu(display("No addresses returned."))] NoAddresses, #[snafu(display("Unable to resolve DNS: {}", source))] DnsError { source: crate::dns::DnsError }, - #[snafu(display("Failed to get UdpSocket back: {}", source))] - ServiceChannelRecvError { source: oneshot::error::RecvError }, } /// A UDP sink. @@ -86,14 +80,6 @@ impl UdpSinkConfig { Ok(UdpConnector::new(host, port, self.send_buffer_bytes)) } - pub fn build_service(&self) -> crate::Result<(UdpService, Healthcheck)> { - let connector = self.build_connector()?; - Ok(( - UdpService::new(connector.clone()), - async move { connector.healthcheck().await }.boxed(), - )) - } - pub fn build( &self, transformer: Transformer, @@ -145,7 +131,7 @@ impl UdpConnector { let socket = UdpSocket::bind(bind_address).await.context(BindSnafu)?; if let Some(send_buffer_bytes) = self.send_buffer_bytes { - if let Err(error) = udp::set_send_buffer_size(&socket, send_buffer_bytes) { + if let Err(error) = net::set_send_buffer_size(&socket, send_buffer_bytes) { warn!(message = "Failed configuring send buffer size on UDP socket.", %error); } } @@ -176,92 +162,6 @@ impl UdpConnector { } } -enum UdpServiceState { - Disconnected, - Connecting(BoxFuture<'static, UdpSocket>), - Connected(UdpSocket), - Sending(oneshot::Receiver), -} - -pub struct UdpService { - connector: UdpConnector, - state: UdpServiceState, - bytes_sent: Registered, -} - -impl UdpService { - fn new(connector: UdpConnector) -> Self { - Self { - connector, - state: UdpServiceState::Disconnected, - bytes_sent: register!(BytesSent::from(Protocol::UDP)), - } - } -} - -impl Service for UdpService { - type Response = (); - type Error = UdpError; - type Future = BoxFuture<'static, Result<(), Self::Error>>; - - // Emission of an internal event in case of errors is handled upstream by the caller. - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - self.state = match &mut self.state { - UdpServiceState::Disconnected => { - let connector = self.connector.clone(); - UdpServiceState::Connecting(Box::pin(async move { - connector.connect_backoff().await - })) - } - UdpServiceState::Connecting(fut) => { - let socket = ready!(fut.poll_unpin(cx)); - UdpServiceState::Connected(socket) - } - UdpServiceState::Connected(_) => break, - UdpServiceState::Sending(fut) => { - let socket = match ready!(fut.poll_unpin(cx)).context(ServiceChannelRecvSnafu) { - Ok(socket) => socket, - Err(error) => return Poll::Ready(Err(error)), - }; - UdpServiceState::Connected(socket) - } - }; - } - Poll::Ready(Ok(())) - } - - // Emission of internal events for errors and dropped events is handled upstream by the caller. - fn call(&mut self, msg: BytesMut) -> Self::Future { - let (sender, receiver) = oneshot::channel(); - let byte_size = msg.len(); - let bytes_sent = self.bytes_sent.clone(); - - let mut socket = - match std::mem::replace(&mut self.state, UdpServiceState::Sending(receiver)) { - UdpServiceState::Connected(socket) => socket, - _ => panic!("UdpService::poll_ready should be called first"), - }; - - Box::pin(async move { - // TODO: Add reconnect support as TCP/Unix? - let result = udp_send(&mut socket, &msg).await.context(SendSnafu); - _ = sender.send(socket); - - if result.is_ok() { - // NOTE: This is obviously not happening before things like compression, etc, so it's currently a - // stopgap for the `socket` and `statsd` sinks, and potentially others, to ensure that we're at least - // emitting the `BytesSent` event, and related metrics... and practically, those sinks don't compress - // anyways, so the metrics are correct as-is... they just may not be correct in the future if - // compression support was added, etc. - bytes_sent.emit(ByteSize(byte_size)); - } - - result - }) - } -} - struct UdpSink where E: Encoder + Clone + Send + Sync, diff --git a/src/sources/socket/udp.rs b/src/sources/socket/udp.rs index 50fd6a7f6f408..1671be995a299 100644 --- a/src/sources/socket/udp.rs +++ b/src/sources/socket/udp.rs @@ -22,6 +22,7 @@ use crate::{ internal_events::{ SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError, }, + net, serde::{default_decoding, default_framing_message_based}, shutdown::ShutdownSignal, sources::{ @@ -29,7 +30,7 @@ use crate::{ util::net::{try_bind_udp_socket, SocketListenAddr}, Source, }, - udp, SourceSender, + SourceSender, }; /// UDP configuration for the `socket` source. @@ -158,7 +159,7 @@ pub(super) fn udp( })?; if let Some(receive_buffer_bytes) = config.receive_buffer_bytes { - if let Err(error) = udp::set_receive_buffer_size(&socket, receive_buffer_bytes) { + if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) { warn!(message = "Failed configuring receive buffer size on UDP socket.", %error); } } diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index 8fcb93e176e4b..4f1a09503492c 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -27,10 +27,11 @@ use crate::{ EventsReceived, SocketBindError, SocketBytesReceived, SocketMode, SocketReceiveError, StreamClosedError, }, + net, shutdown::ShutdownSignal, tcp::TcpKeepaliveConfig, tls::{MaybeTlsSettings, TlsSourceConfig}, - udp, SourceSender, + SourceSender, }; pub mod parser; @@ -273,7 +274,7 @@ async fn statsd_udp( .await?; if let Some(receive_buffer_bytes) = config.receive_buffer_bytes { - if let Err(error) = udp::set_receive_buffer_size(&socket, receive_buffer_bytes) { + if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) { warn!(message = "Failed configuring receive buffer size on UDP socket.", %error); } } diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 6bd7fd3c6df7d..9fdc84e274b40 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -29,11 +29,12 @@ use crate::{ event::Event, internal_events::StreamClosedError, internal_events::{SocketBindError, SocketMode, SocketReceiveError}, + net, shutdown::ShutdownSignal, sources::util::net::{try_bind_udp_socket, SocketListenAddr, TcpNullAcker, TcpSource}, tcp::TcpKeepaliveConfig, tls::{MaybeTlsSettings, TlsSourceConfig}, - udp, SourceSender, + SourceSender, }; /// Configuration for the `syslog` source. @@ -318,7 +319,7 @@ pub fn udp( })?; if let Some(receive_buffer_bytes) = receive_buffer_bytes { - if let Err(error) = udp::set_receive_buffer_size(&socket, receive_buffer_bytes) { + if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) { warn!(message = "Failed configuring receive buffer size on UDP socket.", %error); } } diff --git a/src/udp.rs b/src/udp.rs deleted file mode 100644 index 522a7f2cf1169..0000000000000 --- a/src/udp.rs +++ /dev/null @@ -1,15 +0,0 @@ -#![allow(missing_docs)] -use socket2::SockRef; -use tokio::net::UdpSocket; - -// This function will be obsolete after tokio/mio internally use `socket2` and expose the methods to -// apply options to a socket. -pub fn set_receive_buffer_size(socket: &UdpSocket, size: usize) -> std::io::Result<()> { - SockRef::from(socket).set_recv_buffer_size(size) -} - -// This function will be obsolete after tokio/mio internally use `socket2` and expose the methods to -// apply options to a socket. -pub fn set_send_buffer_size(socket: &UdpSocket, size: usize) -> std::io::Result<()> { - SockRef::from(socket).set_send_buffer_size(size) -} diff --git a/website/cue/reference/components/sinks/base/statsd.cue b/website/cue/reference/components/sinks/base/statsd.cue index a05e917bcd892..3a9273c8b5132 100644 --- a/website/cue/reference/components/sinks/base/statsd.cue +++ b/website/cue/reference/components/sinks/base/statsd.cue @@ -31,18 +31,17 @@ base: components: sinks: statsd: configuration: { description: """ The address to connect to. - Both IP address and hostname are accepted formats. + Both IP addresses and hostnames/fully qualified domain names (FQDNs) are accepted formats. The address _must_ include a port. """ relevant_when: "mode = \"tcp\" or mode = \"udp\"" required: true - type: string: examples: ["92.12.333.224:5000", "https://somehost:5000"] + type: string: examples: ["92.12.333.224:5000", "somehost:5000"] } batch: { - description: "Event batching behavior." - relevant_when: "mode = \"udp\"" - required: false + description: "Event batching behavior." + required: false type: object: options: { max_bytes: { description: """ @@ -114,14 +113,13 @@ base: components: sinks: statsd: configuration: { required: true type: string: examples: ["/path/to/socket"] } - send_buffer_bytes: { + send_buffer_size: { description: """ The size of the socket's send buffer. If set, the value of the setting is passed via the `SO_SNDBUF` option. """ - relevant_when: "mode = \"tcp\" or mode = \"udp\"" - required: false + required: false type: uint: { examples: [ 65536, @@ -225,4 +223,16 @@ base: components: sinks: statsd: configuration: { } } } + unix_mode: { + description: "The Unix socket mode to use." + relevant_when: "mode = \"unix\"" + required: false + type: string: { + default: "Stream" + enum: { + Datagram: "Datagram-oriented (`SOCK_DGRAM`)." + Stream: "Stream-oriented (`SOCK_STREAM`)." + } + } + } }