Skip to content

Commit 6037898

Browse files
committed
lot o cleanup
1 parent b32e933 commit 6037898

File tree

6 files changed

+71
-84
lines changed

6 files changed

+71
-84
lines changed

src/net.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! Networking-related helper functions.
2+
13
use std::{io, os::fd::AsRawFd, time::Duration};
24

35
use socket2::{SockRef, TcpKeepalive};

src/sinks/statsd/tests.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ use tokio::{net::UdpSocket, sync::mpsc};
55
use tokio_stream::wrappers::ReceiverStream;
66
use tokio_util::{codec::BytesCodec, udp::UdpFramed};
77
use vector_core::{
8-
event::{
9-
metric::TagValue, Event, Metric, MetricKind, MetricTags, MetricValue, StatisticKind,
10-
},
8+
event::{metric::TagValue, Event, Metric, MetricKind, MetricTags, MetricValue, StatisticKind},
119
metric_tags,
1210
};
1311

src/sinks/util/service/net/mod.rs

+20
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ pub use self::tcp::{TcpConnector, TcpConnectorConfig};
66
pub use self::udp::{UdpConnector, UdpConnectorConfig};
77
pub use self::unix::{UnixConnector, UnixConnectorConfig, UnixMode};
88

9+
use futures_util::future::BoxFuture;
910
use snafu::Snafu;
11+
use tokio::sync::oneshot;
1012
use vector_config::configurable_component;
1113

1214
/// Hostname and port tuple.
@@ -71,3 +73,21 @@ pub enum NetError {
7173
#[snafu(display("Failed to get socket back after send as channel closed unexpectedly."))]
7274
ServiceSocketChannelClosed,
7375
}
76+
77+
pub enum ServiceState<C> {
78+
/// The service is currently disconnected.
79+
Disconnected,
80+
81+
/// The service is currently attempting to connect to the endpoint.
82+
Connecting(BoxFuture<'static, C>),
83+
84+
/// The service is connected and idle.
85+
Connected(C),
86+
87+
/// The service has an in-flight send to the socket.
88+
///
89+
/// If the socket experiences an unrecoverable error during the send, `None` will be returned
90+
/// over the channel to signal the need to establish a new connection rather than reusing the
91+
/// existing connection.
92+
Sending(oneshot::Receiver<Option<C>>),
93+
}

src/sinks/util/service/net/tcp.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::{
44
time::Duration,
55
};
66

7-
use futures::{future::BoxFuture, FutureExt};
7+
use futures::future::BoxFuture;
8+
use futures_util::FutureExt;
89
use snafu::ResultExt;
910
use tokio::{
1011
io::AsyncWriteExt,
@@ -24,7 +25,7 @@ use crate::{
2425
sinks::{util::retries::ExponentialBackoff, Healthcheck},
2526
};
2627

27-
use super::{HostAndPort, NetError, net_error::*};
28+
use super::{net_error::*, HostAndPort, NetError};
2829

2930
/// `TcpConnector` configuration.
3031
#[configurable_component]

src/sinks/util/service/net/udp.rs

+18-37
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::{
44
time::Duration,
55
};
66

7-
use futures::{future::BoxFuture, FutureExt};
7+
use futures::future::BoxFuture;
8+
use futures_util::FutureExt;
89
use snafu::ResultExt;
910
use tokio::{net::UdpSocket, sync::oneshot, time::sleep};
1011
use tower::Service;
@@ -18,7 +19,7 @@ use crate::{
1819
sinks::{util::retries::ExponentialBackoff, Healthcheck},
1920
};
2021

21-
use super::{HostAndPort, NetError, net_error::*};
22+
use super::{net_error::*, HostAndPort, NetError, ServiceState};
2223

2324
/// `UdpConnector` configuration.
2425
#[configurable_component]
@@ -73,9 +74,7 @@ impl UdpConnector {
7374
let addr = SocketAddr::new(ip, self.address.port);
7475
let bind_address = find_bind_address(&addr);
7576

76-
let socket = UdpSocket::bind(bind_address)
77-
.await
78-
.context(FailedToBind)?;
77+
let socket = UdpSocket::bind(bind_address).await.context(FailedToBind)?;
7978

8079
if let Some(send_buffer_size) = self.send_buffer_size {
8180
if let Err(error) = net::set_send_buffer_size(&socket, send_buffer_size) {
@@ -117,34 +116,16 @@ impl UdpConnector {
117116
}
118117
}
119118

120-
enum UdpServiceState {
121-
/// The service is currently disconnected.
122-
Disconnected,
123-
124-
/// The service is currently attempting to connect to the endpoint.
125-
Connecting(BoxFuture<'static, UdpSocket>),
126-
127-
/// The service is connected and idle.
128-
Connected(UdpSocket),
129-
130-
/// The service has an in-flight send to the socket.
131-
///
132-
/// If the socket experiences an unrecoverable error during the send, `None` will be returned
133-
/// over the channel to signal the need to establish a new connection rather than reusing the
134-
/// existing connection.
135-
Sending(oneshot::Receiver<Option<UdpSocket>>),
136-
}
137-
138119
pub struct UdpService {
139120
connector: UdpConnector,
140-
state: UdpServiceState,
121+
state: ServiceState<UdpSocket>,
141122
}
142123

143124
impl UdpService {
144125
const fn new(connector: UdpConnector) -> Self {
145126
Self {
146127
connector,
147-
state: UdpServiceState::Disconnected,
128+
state: ServiceState::Disconnected,
148129
}
149130
}
150131
}
@@ -157,24 +138,24 @@ impl Service<Vec<u8>> for UdpService {
157138
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
158139
loop {
159140
self.state = match &mut self.state {
160-
UdpServiceState::Disconnected => {
141+
ServiceState::Disconnected => {
161142
let connector = self.connector.clone();
162-
UdpServiceState::Connecting(Box::pin(async move {
163-
connector.connect_backoff().await
164-
}))
143+
ServiceState::Connecting(Box::pin(
144+
async move { connector.connect_backoff().await },
145+
))
165146
}
166-
UdpServiceState::Connecting(fut) => {
147+
ServiceState::Connecting(fut) => {
167148
let socket = ready!(fut.poll_unpin(cx));
168-
UdpServiceState::Connected(socket)
149+
ServiceState::Connected(socket)
169150
}
170-
UdpServiceState::Connected(_) => break,
171-
UdpServiceState::Sending(fut) => {
151+
ServiceState::Connected(_) => break,
152+
ServiceState::Sending(fut) => {
172153
match ready!(fut.poll_unpin(cx)) {
173154
// When a send concludes, and there's an error, the request future sends
174155
// back `None`. Otherwise, it'll send back `Some(...)` with the socket.
175156
Ok(maybe_socket) => match maybe_socket {
176-
Some(socket) => UdpServiceState::Connected(socket),
177-
None => UdpServiceState::Disconnected,
157+
Some(socket) => ServiceState::Connected(socket),
158+
None => ServiceState::Disconnected,
178159
},
179160
Err(_) => return Poll::Ready(Err(NetError::ServiceSocketChannelClosed)),
180161
}
@@ -187,8 +168,8 @@ impl Service<Vec<u8>> for UdpService {
187168
fn call(&mut self, buf: Vec<u8>) -> Self::Future {
188169
let (tx, rx) = oneshot::channel();
189170

190-
let socket = match std::mem::replace(&mut self.state, UdpServiceState::Sending(rx)) {
191-
UdpServiceState::Connected(socket) => socket,
171+
let socket = match std::mem::replace(&mut self.state, ServiceState::Sending(rx)) {
172+
ServiceState::Connected(socket) => socket,
192173
_ => panic!("poll_ready must be called first"),
193174
};
194175

src/sinks/util/service/net/unix.rs

+27-42
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use std::{
66
time::Duration,
77
};
88

9-
use futures::{future::BoxFuture, FutureExt};
9+
use futures::future::BoxFuture;
10+
use futures_util::FutureExt;
1011
use snafu::ResultExt;
1112
use tokio::{
1213
io::AsyncWriteExt,
@@ -26,7 +27,7 @@ use crate::{
2627
sinks::{util::retries::ExponentialBackoff, Healthcheck},
2728
};
2829

29-
use super::{NetError, net_error::*};
30+
use super::{net_error::*, NetError, ServiceState};
3031

3132
/// Unix socket modes.
3233
#[configurable_component]
@@ -120,14 +121,16 @@ pub struct UnixConnector {
120121
impl UnixConnector {
121122
async fn connect(&self) -> Result<(PathBuf, UnixEither), NetError> {
122123
let either_socket = match self.mode {
123-
UnixMode::Datagram => UnixDatagram::unbound()
124-
.context(FailedToBind)
125-
.and_then(|datagram| {
126-
datagram
127-
.connect(&self.path)
128-
.context(FailedToConnect)
129-
.map(|_| UnixEither::Datagram(datagram))
130-
})?,
124+
UnixMode::Datagram => {
125+
UnixDatagram::unbound()
126+
.context(FailedToBind)
127+
.and_then(|datagram| {
128+
datagram
129+
.connect(&self.path)
130+
.context(FailedToConnect)
131+
.map(|_| UnixEither::Datagram(datagram))
132+
})?
133+
}
131134
UnixMode::Stream => UnixStream::connect(&self.path)
132135
.await
133136
.context(FailedToConnect)
@@ -175,34 +178,16 @@ impl UnixConnector {
175178
}
176179
}
177180

178-
enum UnixServiceState {
179-
/// The service is currently disconnected.
180-
Disconnected,
181-
182-
/// The service is currently attempting to connect to the endpoint.
183-
Connecting(BoxFuture<'static, UnixEither>),
184-
185-
/// The service is connected and idle.
186-
Connected(UnixEither),
187-
188-
/// The service has an in-flight send to the socket.
189-
///
190-
/// If the socket experiences an unrecoverable error during the send, `None` will be returned
191-
/// over the channel to signal the need to establish a new connection rather than reusing the
192-
/// existing connection.
193-
Sending(oneshot::Receiver<Option<UnixEither>>),
194-
}
195-
196181
pub struct UnixService {
197182
connector: UnixConnector,
198-
state: UnixServiceState,
183+
state: ServiceState<UnixEither>,
199184
}
200185

201186
impl UnixService {
202187
const fn new(connector: UnixConnector) -> Self {
203188
Self {
204189
connector,
205-
state: UnixServiceState::Disconnected,
190+
state: ServiceState::Disconnected,
206191
}
207192
}
208193
}
@@ -215,24 +200,24 @@ impl Service<Vec<u8>> for UnixService {
215200
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
216201
loop {
217202
self.state = match &mut self.state {
218-
UnixServiceState::Disconnected => {
203+
ServiceState::Disconnected => {
219204
let connector = self.connector.clone();
220-
UnixServiceState::Connecting(Box::pin(async move {
221-
connector.connect_backoff().await
222-
}))
205+
ServiceState::Connecting(Box::pin(
206+
async move { connector.connect_backoff().await },
207+
))
223208
}
224-
UnixServiceState::Connecting(fut) => {
209+
ServiceState::Connecting(fut) => {
225210
let socket = ready!(fut.poll_unpin(cx));
226-
UnixServiceState::Connected(socket)
211+
ServiceState::Connected(socket)
227212
}
228-
UnixServiceState::Connected(_) => break,
229-
UnixServiceState::Sending(fut) => {
213+
ServiceState::Connected(_) => break,
214+
ServiceState::Sending(fut) => {
230215
match ready!(fut.poll_unpin(cx)) {
231216
// When a send concludes, and there's an error, the request future sends
232217
// back `None`. Otherwise, it'll send back `Some(...)` with the socket.
233218
Ok(maybe_socket) => match maybe_socket {
234-
Some(socket) => UnixServiceState::Connected(socket),
235-
None => UnixServiceState::Disconnected,
219+
Some(socket) => ServiceState::Connected(socket),
220+
None => ServiceState::Disconnected,
236221
},
237222
Err(_) => return Poll::Ready(Err(NetError::ServiceSocketChannelClosed)),
238223
}
@@ -245,8 +230,8 @@ impl Service<Vec<u8>> for UnixService {
245230
fn call(&mut self, buf: Vec<u8>) -> Self::Future {
246231
let (tx, rx) = oneshot::channel();
247232

248-
let mut socket = match std::mem::replace(&mut self.state, UnixServiceState::Sending(rx)) {
249-
UnixServiceState::Connected(socket) => socket,
233+
let mut socket = match std::mem::replace(&mut self.state, ServiceState::Sending(rx)) {
234+
ServiceState::Connected(socket) => socket,
250235
_ => panic!("poll_ready must be called first"),
251236
};
252237

0 commit comments

Comments
 (0)