Skip to content

Commit e11b2ad

Browse files
authored
refactor(lib): drop futures-util except in ffi (#3890)
Make hyper usable for h1/h2 and client/server without this heavyweight dependency. It's about 17k lines of code and takes up to 1.7 seconds to compile on my machine, but hyper is only using a tiny fraction of it. Larger applications probably still pull in futures-util by other means, but it's no longer as unavoidable as in the early days of the ecosystem. To remove futures-util without raising MSRV, I took these steps: * When futures-util just re-exports something from its dependencies, use it directly from the source. * Inline trivial helpers like `poll_unpin` that "only" communicate intent a little better but don't save any significant amount of code. * Refactor the h2 client code to avoid `StreamFuture` for the "Client has been dropped" detection -- just poll the mpsc channel directly. * Implement a couple of small helpers from scratch when they're straightforward and fit on one screen each. The majority of this is polyfills for standard library APIs that would require a higher MSRV. * Use `AtomicWaker` from the `atomic-waker` crate, a separately published copy of the futures-util type of the same name. While the two crates are owned by different organizations (smol-rs vs. rust-lang), it's mostly the same people maintaining both copies. The uses of future-util in hyper's tests/benches/examples and in the `ffi` module seem much harder to remove entirely, so I did not touch those modules at all.
1 parent 42aff87 commit e11b2ad

20 files changed

+155
-37
lines changed

Cargo.toml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@ tokio = { version = "1", features = ["sync"] }
2727

2828
# Optional
2929

30+
atomic-waker = { version = "1.1.2", optional = true }
3031
futures-channel = { version = "0.3", optional = true }
31-
futures-util = { version = "0.3", default-features = false, optional = true }
32+
futures-core = { version = "0.3.31", optional = true }
33+
futures-util = { version = "0.3", default-features = false, features = ["alloc"], optional = true }
3234
h2 = { version = "0.4.2", optional = true }
3335
http-body-util = { version = "0.1", optional = true }
3436
httparse = { version = "1.9", optional = true }
3537
httpdate = { version = "1.0", optional = true }
3638
itoa = { version = "1", optional = true }
3739
pin-project-lite = { version = "0.2.4", optional = true }
40+
pin-utils = { version = "0.1", optional = true } # TODO: replace with std::pin::pin! once MSRV >= 1.68
3841
smallvec = { version = "1.12", features = ["const_generics", "const_new"], optional = true }
3942
tracing = { version = "0.1", default-features = false, features = ["std"], optional = true }
4043
want = { version = "0.3", optional = true }
@@ -77,15 +80,15 @@ full = [
7780
]
7881

7982
# HTTP versions
80-
http1 = ["dep:futures-channel", "dep:futures-util", "dep:httparse", "dep:itoa"]
81-
http2 = ["dep:futures-channel", "dep:futures-util", "dep:h2"]
83+
http1 = ["dep:atomic-waker", "dep:futures-channel", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"]
84+
http2 = ["dep:futures-channel", "dep:futures-core", "dep:h2"]
8285

8386
# Client/Server
8487
client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"]
8588
server = ["dep:httpdate", "dep:pin-project-lite", "dep:smallvec"]
8689

8790
# C-API support (currently unstable (no semver))
88-
ffi = ["dep:http-body-util", "futures-util?/alloc"]
91+
ffi = ["dep:http-body-util", "futures-util"]
8992
capi = []
9093

9194
# Utilize tracing (currently unstable)

src/body/incoming.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ use futures_channel::{mpsc, oneshot};
1111
any(feature = "http1", feature = "http2"),
1212
any(feature = "client", feature = "server")
1313
))]
14-
use futures_util::ready;
14+
use futures_core::ready;
1515
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
16-
use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver
16+
use futures_core::{stream::FusedStream, Stream}; // for mpsc::Receiver
1717
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
1818
use http::HeaderMap;
1919
use http_body::{Body, Frame, SizeHint};

src/client/conn/http1.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::task::{Context, Poll};
88

99
use crate::rt::{Read, Write};
1010
use bytes::Bytes;
11-
use futures_util::ready;
11+
use futures_core::ready;
1212
use http::{Request, Response};
1313
use httparse::ParserConfig;
1414

@@ -92,7 +92,7 @@ where
9292
/// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
9393
pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
9494
let mut conn = Some(self);
95-
futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
95+
crate::common::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
9696
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
9797
Poll::Ready(Ok(conn.take().unwrap().into_parts()))
9898
})
@@ -148,7 +148,7 @@ impl<B> SendRequest<B> {
148148
///
149149
/// If the associated connection is closed, this returns an Error.
150150
pub async fn ready(&mut self) -> crate::Result<()> {
151-
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
151+
crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await
152152
}
153153

154154
/// Checks if the connection is currently ready to send a request.

src/client/conn/http2.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::task::{Context, Poll};
1010
use std::time::Duration;
1111

1212
use crate::rt::{Read, Write};
13-
use futures_util::ready;
13+
use futures_core::ready;
1414
use http::{Request, Response};
1515

1616
use super::super::dispatch::{self, TrySendError};
@@ -99,7 +99,7 @@ impl<B> SendRequest<B> {
9999
///
100100
/// If the associated connection is closed, this returns an Error.
101101
pub async fn ready(&mut self) -> crate::Result<()> {
102-
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
102+
crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await
103103
}
104104

105105
/// Checks if the connection is currently ready to send a request.

src/client/dispatch.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,7 @@ impl<T, U> Receiver<T, U> {
199199

200200
#[cfg(feature = "http1")]
201201
pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
202-
use futures_util::FutureExt;
203-
match self.inner.recv().now_or_never() {
202+
match crate::common::task::now_or_never(self.inner.recv()) {
204203
Some(Some(mut env)) => env.0.take(),
205204
_ => None,
206205
}

src/common/either.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use pin_project_lite::pin_project;
2+
use std::{
3+
future::Future,
4+
pin::Pin,
5+
task::{Context, Poll},
6+
};
7+
8+
pin_project! {
9+
/// One of two possible futures that have the same output type.
10+
#[project = EitherProj]
11+
pub(crate) enum Either<F1, F2> {
12+
Left {
13+
#[pin]
14+
fut: F1
15+
},
16+
Right {
17+
#[pin]
18+
fut: F2,
19+
},
20+
}
21+
}
22+
23+
impl<F1, F2> Either<F1, F2> {
24+
pub(crate) fn left(fut: F1) -> Self {
25+
Either::Left { fut }
26+
}
27+
28+
pub(crate) fn right(fut: F2) -> Self {
29+
Either::Right { fut }
30+
}
31+
}
32+
33+
impl<F1, F2> Future for Either<F1, F2>
34+
where
35+
F1: Future,
36+
F2: Future<Output = F1::Output>,
37+
{
38+
type Output = F1::Output;
39+
40+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41+
match self.project() {
42+
EitherProj::Left { fut } => fut.poll(cx),
43+
EitherProj::Right { fut } => fut.poll(cx),
44+
}
45+
}
46+
}

src/common/future.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use std::{
2+
future::Future,
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
7+
// TODO: replace with `std::future::poll_fn` once MSRV >= 1.64
8+
pub(crate) fn poll_fn<T, F>(f: F) -> PollFn<F>
9+
where
10+
F: FnMut(&mut Context<'_>) -> Poll<T>,
11+
{
12+
PollFn { f }
13+
}
14+
15+
pub(crate) struct PollFn<F> {
16+
f: F,
17+
}
18+
19+
impl<F> Unpin for PollFn<F> {}
20+
21+
impl<T, F> Future for PollFn<F>
22+
where
23+
F: FnMut(&mut Context<'_>) -> Poll<T>,
24+
{
25+
type Output = T;
26+
27+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
28+
(self.as_mut().f)(cx)
29+
}
30+
}

src/common/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22
pub(crate) mod buf;
33
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
44
pub(crate) mod date;
5+
#[cfg(all(feature = "client", feature = "http2"))]
6+
pub(crate) mod either;
7+
#[cfg(any(
8+
all(feature = "client", any(feature = "http1", feature = "http2")),
9+
all(feature = "server", feature = "http1"),
10+
))]
11+
pub(crate) mod future;
512
pub(crate) mod io;
613
#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
714
pub(crate) mod task;

src/common/task.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use std::task::{Context, Poll};
2+
#[cfg(feature = "client")]
3+
use std::task::{RawWaker, RawWakerVTable, Waker};
24

35
/// A function to help "yield" a future, such that it is re-scheduled immediately.
46
///
@@ -7,3 +9,37 @@ pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll<std::convert::Infallible>
79
cx.waker().wake_by_ref();
810
Poll::Pending
911
}
12+
13+
// TODO: replace with `std::task::Waker::noop()` once MSRV >= 1.85
14+
#[cfg(feature = "client")]
15+
fn noop_waker() -> Waker {
16+
const NOOP_RAW_WAKER: RawWaker = RawWaker::new(std::ptr::null(), &NOOP_VTABLE);
17+
const NOOP_VTABLE: RawWakerVTable = RawWakerVTable::new(
18+
// `clone` returns the same noop waker again
19+
|_: *const ()| NOOP_RAW_WAKER,
20+
// `wake`, `wake_by_ref`, and `drop` do nothing
21+
|_: *const ()| {},
22+
|_: *const ()| {},
23+
|_: *const ()| {},
24+
);
25+
26+
// SAFETY: all functions in the vtable are safe to call, and Waker's safety does not require
27+
// them to actually do anything.
28+
unsafe { Waker::from_raw(NOOP_RAW_WAKER) }
29+
}
30+
31+
/// Poll the future once and return `Some` if it is ready, else `None`.
32+
///
33+
/// If the future wasn't ready, it future likely can't be driven to completion any more: the polling
34+
/// uses a no-op waker, so knowledge of what the pending future was waiting for is lost.
35+
#[cfg(feature = "client")]
36+
pub(crate) fn now_or_never<F: std::future::Future>(fut: F) -> Option<F::Output> {
37+
let waker = noop_waker();
38+
let mut cx = Context::from_waker(&waker);
39+
// TODO: replace with std::pin::pin! and drop pin-utils once MSRV >= 1.68
40+
pin_utils::pin_mut!(fut);
41+
match fut.poll(&mut cx) {
42+
Poll::Ready(res) => Some(res),
43+
Poll::Pending => None,
44+
}
45+
}

src/common/time.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl Time {
4040
}
4141
}
4242

43-
#[cfg(feature = "http1")]
43+
#[cfg(all(feature = "server", feature = "http1"))]
4444
pub(crate) fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
4545
match *self {
4646
Time::Empty => {

src/common/watch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! - The consumer is only notified if the value is different.
55
//! - The value `0` is reserved for closed.
66
7-
use futures_util::task::AtomicWaker;
7+
use atomic_waker::AtomicWaker;
88
use std::sync::{
99
atomic::{AtomicUsize, Ordering},
1010
Arc,

src/proto/h1/conn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
1010

1111
use crate::rt::{Read, Write};
1212
use bytes::{Buf, Bytes};
13-
use futures_util::ready;
13+
use futures_core::ready;
1414
use http::header::{HeaderValue, CONNECTION, TE};
1515
use http::{HeaderMap, Method, Version};
1616
use http_body::Frame;

src/proto/h1/decode.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::io;
44
use std::task::{Context, Poll};
55

66
use bytes::{BufMut, Bytes, BytesMut};
7-
use futures_util::ready;
7+
use futures_core::ready;
88
use http::{HeaderMap, HeaderName, HeaderValue};
99
use http_body::Frame;
1010

src/proto/h1/dispatch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88

99
use crate::rt::{Read, Write};
1010
use bytes::{Buf, Bytes};
11-
use futures_util::ready;
11+
use futures_core::ready;
1212
use http::Request;
1313

1414
use super::{Http1Transaction, Wants};

src/proto/h1/io.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::task::{Context, Poll};
66

77
use crate::rt::{Read, ReadBuf, Write};
88
use bytes::{Buf, BufMut, Bytes, BytesMut};
9-
use futures_util::ready;
9+
use futures_core::ready;
1010

1111
use super::{Http1Transaction, ParseContext, ParsedMessage};
1212
use crate::common::buf::BufList;

src/proto/h2/client.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ use crate::rt::{Read, Write};
1111
use bytes::Bytes;
1212
use futures_channel::mpsc::{Receiver, Sender};
1313
use futures_channel::{mpsc, oneshot};
14-
use futures_util::future::{Either, FusedFuture, FutureExt as _};
15-
use futures_util::ready;
16-
use futures_util::stream::{StreamExt as _, StreamFuture};
14+
use futures_core::{ready, FusedFuture, FusedStream, Stream};
1715
use h2::client::{Builder, Connection, SendRequest};
1816
use h2::SendStream;
1917
use http::{Method, StatusCode};
@@ -23,6 +21,7 @@ use super::ping::{Ponger, Recorder};
2321
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
2422
use crate::body::{Body, Incoming as IncomingBody};
2523
use crate::client::dispatch::{Callback, SendWhen, TrySendError};
24+
use crate::common::either::Either;
2625
use crate::common::io::Compat;
2726
use crate::common::time::Time;
2827
use crate::ext::Protocol;
@@ -164,21 +163,19 @@ where
164163
// 'Client' has been dropped. This is to get around a bug
165164
// in h2 where dropping all SendRequests won't notify a
166165
// parked Connection.
167-
let (conn_drop_ref, rx) = mpsc::channel(1);
166+
let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
168167
let (cancel_tx, conn_eof) = oneshot::channel();
169168

170-
let conn_drop_rx = rx.into_future();
171-
172169
let ping_config = new_ping_config(config);
173170

174171
let (conn, ping) = if ping_config.is_enabled() {
175172
let pp = conn.ping_pong().expect("conn.ping_pong");
176173
let (recorder, ponger) = ping::channel(pp, ping_config, timer);
177174

178175
let conn: Conn<_, B> = Conn::new(ponger, conn);
179-
(Either::Left(conn), recorder)
176+
(Either::left(conn), recorder)
180177
} else {
181-
(Either::Right(conn), ping::disabled())
178+
(Either::right(conn), ping::disabled())
182179
};
183180
let conn: ConnMapErr<T, B> = ConnMapErr {
184181
conn,
@@ -305,7 +302,7 @@ pin_project! {
305302
T: Unpin,
306303
{
307304
#[pin]
308-
drop_rx: StreamFuture<Receiver<Infallible>>,
305+
drop_rx: Receiver<Infallible>,
309306
#[pin]
310307
cancel_tx: Option<oneshot::Sender<Infallible>>,
311308
#[pin]
@@ -320,7 +317,7 @@ where
320317
{
321318
fn new(
322319
conn: ConnMapErr<T, B>,
323-
drop_rx: StreamFuture<Receiver<Infallible>>,
320+
drop_rx: Receiver<Infallible>,
324321
cancel_tx: oneshot::Sender<Infallible>,
325322
) -> Self {
326323
Self {
@@ -341,12 +338,12 @@ where
341338
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
342339
let mut this = self.project();
343340

344-
if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
341+
if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
345342
// ok or err, the `conn` has finished.
346343
return Poll::Ready(());
347344
}
348345

349-
if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
346+
if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
350347
// mpsc has been dropped, hopefully polling
351348
// the connection some more should start shutdown
352349
// and then close.
@@ -468,7 +465,7 @@ where
468465
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
469466
let mut this = self.project();
470467

471-
match this.pipe.poll_unpin(cx) {
468+
match Pin::new(&mut this.pipe).poll(cx) {
472469
Poll::Ready(result) => {
473470
if let Err(_e) = result {
474471
debug!("client request body error: {}", _e);

src/proto/h2/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::pin::Pin;
66
use std::task::{Context, Poll};
77

88
use bytes::{Buf, Bytes};
9-
use futures_util::ready;
9+
use futures_core::ready;
1010
use h2::{Reason, RecvStream, SendStream};
1111
use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
1212
use http::HeaderMap;

src/proto/h2/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::task::{Context, Poll};
55
use std::time::Duration;
66

77
use bytes::Bytes;
8-
use futures_util::ready;
8+
use futures_core::ready;
99
use h2::server::{Connection, Handshake, SendResponse};
1010
use h2::{Reason, RecvStream};
1111
use http::{Method, Request};

0 commit comments

Comments
 (0)