Skip to content

Commit f473a2d

Browse files
authored
Simplify delay reset queue (#56)
1 parent da158ba commit f473a2d

File tree

5 files changed

+110
-94
lines changed

5 files changed

+110
-94
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [1.8.6] - 2025-03-12
4+
5+
* Simplify delay reset queue
6+
37
## [1.8.5] - 2025-02-12
48

59
* Fix handle for REFUSED_STREAM reset

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-h2"
3-
version = "1.8.5"
3+
version = "1.8.6"
44
license = "MIT OR Apache-2.0"
55
authors = ["Nikolay Kim <[email protected]>"]
66
description = "An HTTP/2 client and server"

src/config.rs

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub(crate) struct ConfigInner {
2525
pub(crate) window_sz_threshold: Cell<WindowSize>,
2626
/// How long a locally reset stream should ignore frames
2727
pub(crate) reset_duration: Cell<Duration>,
28+
/// Maximum number of locally reset streams to keep at a time
29+
pub(crate) reset_max: Cell<usize>,
2830
/// Initial window size for new connections.
2931
pub(crate) connection_window_sz: Cell<WindowSize>,
3032
pub(crate) connection_window_sz_threshold: Cell<WindowSize>,
@@ -89,6 +91,7 @@ impl Config {
8991
connection_window_sz_threshold,
9092
dispatcher_config,
9193
settings: Cell::new(settings),
94+
reset_max: Cell::new(consts::DEFAULT_RESET_STREAM_MAX),
9295
reset_duration: Cell::new(consts::DEFAULT_RESET_STREAM_SECS.into()),
9396
remote_max_concurrent_streams: Cell::new(None),
9497
max_header_continuations: Cell::new(consts::DEFAULT_MAX_COUNTINUATIONS),
@@ -232,10 +235,9 @@ impl Config {
232235
/// received for that stream will result in a connection level protocol
233236
/// error, forcing the connection to terminate.
234237
///
235-
/// The default value is 30.
236-
#[doc(hidden)]
237-
#[deprecated]
238-
pub fn max_concurrent_reset_streams(&self, _: usize) -> &Self {
238+
/// The default value is 32.
239+
pub fn max_concurrent_reset_streams(&self, val: usize) -> &Self {
240+
self.0.reset_max.set(val);
239241
self
240242
}
241243

@@ -355,6 +357,7 @@ impl fmt::Debug for Config {
355357
.field("window_sz", &self.0.window_sz.get())
356358
.field("window_sz_threshold", &self.0.window_sz_threshold.get())
357359
.field("reset_duration", &self.0.reset_duration.get())
360+
.field("reset_max", &self.0.reset_max.get())
358361
.field("connection_window_sz", &self.0.connection_window_sz.get())
359362
.field(
360363
"connection_window_sz_threshold",
@@ -368,23 +371,3 @@ impl fmt::Debug for Config {
368371
.finish()
369372
}
370373
}
371-
372-
impl fmt::Debug for ConfigInner {
373-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374-
f.debug_struct("Config")
375-
.field("window_sz", &self.window_sz.get())
376-
.field("window_sz_threshold", &self.window_sz_threshold.get())
377-
.field("reset_duration", &self.reset_duration.get())
378-
.field("connection_window_sz", &self.connection_window_sz.get())
379-
.field(
380-
"connection_window_sz_threshold",
381-
&self.connection_window_sz_threshold.get(),
382-
)
383-
.field(
384-
"remote_max_concurrent_streams",
385-
&self.remote_max_concurrent_streams.get(),
386-
)
387-
.field("settings", &self.settings.get())
388-
.finish()
389-
}
390-
}

src/connection.rs

Lines changed: 96 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{cell::Cell, cell::RefCell, fmt, mem, rc::Rc};
2-
use std::{collections::VecDeque, time::Duration, time::Instant};
2+
use std::{collections::VecDeque, time::Instant};
33

44
use ntex_bytes::{ByteString, Bytes};
55
use ntex_http::{HeaderMap, Method};
@@ -408,7 +408,7 @@ impl Connection {
408408
}
409409

410410
// Add ids to pending queue
411-
self.0.local_pending_reset.add(id);
411+
self.0.local_pending_reset.add(id, &self.0.local_config);
412412
}
413413

414414
pub(crate) fn recv_half(&self) -> RecvHalfConnection {
@@ -722,7 +722,7 @@ impl RecvHalfConnection {
722722
stream,
723723
StreamError::Reset(frm.reason()),
724724
)))
725-
} else if self.0.local_pending_reset.is_pending(id) {
725+
} else if self.0.local_pending_reset.remove(id) {
726726
self.update_rst_count()
727727
} else {
728728
self.update_rst_count()?;
@@ -866,79 +866,71 @@ async fn ping(st: Connection, timeout: time::Seconds, io: IoRef) {
866866
}
867867
}
868868

869-
const BLOCKS: usize = 5;
870-
const LAST_BLOCK: usize = 4;
869+
struct Pending(Cell<Option<Box<PendingInner>>>);
871870

872-
#[cfg(not(test))]
873-
const SECS: u64 = 30;
874-
#[cfg(test)]
875-
const SECS: u64 = 1;
876-
877-
const BLOCK_SIZE: Duration = Duration::from_secs(SECS);
878-
const ALL_BLOCKS: Duration = Duration::from_secs((BLOCKS as u64) * SECS);
879-
880-
#[derive(Default)]
881-
struct Pending {
882-
idx: Cell<u8>,
883-
blocks: RefCell<[Block; BLOCKS]>,
884-
}
885-
886-
#[derive(Debug)]
887-
struct Block {
888-
start_time: Instant,
871+
struct PendingInner {
889872
ids: HashSet<StreamId>,
873+
queue: VecDeque<(StreamId, Instant)>,
890874
}
891875

892-
impl Pending {
893-
fn add(&self, id: StreamId) {
894-
let cur = now();
895-
let idx = self.idx.get() as usize;
896-
let mut blocks = self.blocks.borrow_mut();
897-
898-
// check if we need to insert new block
899-
if blocks[idx].start_time < (cur - BLOCK_SIZE) {
900-
// shift blocks
901-
let idx = if idx == 0 { LAST_BLOCK } else { idx - 1 };
902-
// insert new item
903-
blocks[idx].start_time = cur;
904-
blocks[idx].ids.clear();
905-
blocks[idx].ids.insert(id);
906-
self.idx.set(idx as u8);
907-
} else {
908-
blocks[idx].ids.insert(id);
909-
}
876+
impl Default for Pending {
877+
fn default() -> Self {
878+
Self(Cell::new(Some(Box::new(PendingInner {
879+
ids: HashSet::default(),
880+
queue: VecDeque::with_capacity(16),
881+
}))))
910882
}
883+
}
911884

912-
fn is_pending(&self, id: StreamId) -> bool {
913-
let blocks = self.blocks.borrow_mut();
885+
impl Pending {
886+
fn add(&self, id: StreamId, config: &Config) {
887+
let mut inner = self.0.take().unwrap();
914888

915-
let max = now() - ALL_BLOCKS;
916-
let mut idx = self.idx.get() as usize;
889+
let current_time = now();
917890

918-
loop {
919-
let item = &blocks[idx];
920-
if item.start_time < max {
891+
// remove old ids
892+
let max_time = current_time - config.0.reset_duration.get();
893+
while let Some(item) = inner.queue.front() {
894+
if item.1 < max_time {
895+
inner.ids.remove(&item.0);
896+
inner.queue.pop_front();
897+
} else {
921898
break;
922-
} else if item.ids.contains(&id) {
923-
return true;
924899
}
925-
idx += 1;
926-
if idx == BLOCKS {
927-
idx = 0;
928-
} else if idx == self.idx.get() as usize {
929-
break;
900+
}
901+
902+
// shrink size of ids
903+
while inner.queue.len() >= config.0.reset_max.get() {
904+
if let Some((id, _)) = inner.queue.pop_front() {
905+
inner.ids.remove(&id);
930906
}
931907
}
932-
false
908+
909+
inner.ids.insert(id);
910+
inner.queue.push_back((id, current_time));
911+
self.0.set(Some(inner));
933912
}
934-
}
935913

936-
impl Default for Block {
937-
fn default() -> Self {
938-
Self {
939-
ids: HashSet::default(),
940-
start_time: now() - ALL_BLOCKS,
914+
fn remove(&self, id: StreamId) -> bool {
915+
let mut inner = self.0.take().unwrap();
916+
let removed = inner.ids.remove(&id);
917+
if removed {
918+
for idx in 0..inner.queue.len() {
919+
if inner.queue[idx].0 == id {
920+
inner.queue.remove(idx);
921+
break;
922+
}
923+
}
941924
}
925+
self.0.set(Some(inner));
926+
removed
927+
}
928+
929+
fn is_pending(&self, id: StreamId) -> bool {
930+
let inner = self.0.take().unwrap();
931+
let pending = inner.ids.contains(&id);
932+
self.0.set(Some(inner));
933+
pending
942934
}
943935
}
944936

@@ -1017,11 +1009,16 @@ mod tests {
10171009

10181010
#[ntex::test]
10191011
async fn test_delay_reset_queue() {
1012+
let _ = env_logger::try_init();
1013+
10201014
let srv = test_server(|| {
10211015
fn_service(|io: Io<_>| async move {
10221016
let _ = h2::server::handle_one(
10231017
io.into(),
1024-
h2::Config::server().ping_timeout(Seconds::ZERO).clone(),
1018+
h2::Config::server()
1019+
.ping_timeout(Seconds::ZERO)
1020+
.reset_stream_duration(Seconds(1))
1021+
.clone(),
10251022
fn_service(|msg: h2::ControlMessage<h2::StreamError>| async move {
10261023
Ok::<_, ()>(msg.ack())
10271024
}),
@@ -1037,7 +1034,7 @@ mod tests {
10371034
});
10381035

10391036
let addr = ntex::connect::Connect::new("localhost").set_addr(Some(srv.addr()));
1040-
let io = ntex::connect::connect(addr).await.unwrap();
1037+
let io = ntex::connect::connect(addr.clone()).await.unwrap();
10411038
let codec = Codec::default();
10421039
let _ = io.with_write_buf(|buf| buf.extend_from_slice(&PREFACE));
10431040

@@ -1081,14 +1078,45 @@ mod tests {
10811078
assert_eq!(res.reason(), Reason::NO_ERROR);
10821079

10831080
// prev closed stream
1084-
io.send(pl.clone().into(), &codec).await.unwrap();
1081+
io.send(pl.into(), &codec).await.unwrap();
1082+
let res = goaway(io.recv(&codec).await.unwrap().unwrap());
1083+
assert_eq!(res.reason(), Reason::PROTOCOL_ERROR);
1084+
1085+
// SECOND connection
1086+
let io = ntex::connect::connect(addr).await.unwrap();
1087+
let codec = Codec::default();
1088+
let _ = io.with_write_buf(|buf| buf.extend_from_slice(&PREFACE));
1089+
1090+
let settings = frame::Settings::default();
1091+
io.encode(settings.into(), &codec).unwrap();
1092+
1093+
// settings & window
1094+
let _ = io.recv(&codec).await;
1095+
let _ = io.recv(&codec).await;
1096+
let _ = io.recv(&codec).await;
1097+
1098+
let id = frame::StreamId::CLIENT;
1099+
let pseudo = frame::PseudoHeaders {
1100+
method: Some(Method::GET),
1101+
scheme: Some("HTTPS".into()),
1102+
authority: Some("localhost".into()),
1103+
path: Some("/".into()),
1104+
..Default::default()
1105+
};
1106+
let hdrs = frame::Headers::new(id, pseudo.clone(), HeaderMap::new(), false);
1107+
io.send(hdrs.into(), &codec).await.unwrap();
1108+
1109+
// server resets stream
10851110
let res = get_reset(io.recv(&codec).await.unwrap().unwrap());
1086-
assert_eq!(res.reason(), Reason::STREAM_CLOSED);
1111+
assert_eq!(res.reason(), Reason::NO_ERROR);
10871112

1088-
sleep(Millis(5100)).await;
1113+
// after server receives remote reset, any next frame cause protocol error
1114+
io.send(frame::Reset::new(id, Reason::NO_ERROR).into(), &codec)
1115+
.await
1116+
.unwrap();
10891117

1090-
// prev closed stream
1091-
io.send(pl.into(), &codec).await.unwrap();
1118+
let pl = frame::Data::new(id, Bytes::from_static(b"data"));
1119+
io.send(pl.clone().into(), &codec).await.unwrap();
10921120
let res = goaway(io.recv(&codec).await.unwrap().unwrap());
10931121
assert_eq!(res.reason(), Reason::PROTOCOL_ERROR);
10941122
}

src/consts.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use crate::frame::WindowSize;
55

66
// Constants
77
pub(crate) const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
8-
pub(crate) const DEFAULT_RESET_STREAM_SECS: Seconds = Seconds(10);
8+
pub(crate) const DEFAULT_RESET_STREAM_MAX: usize = 32;
9+
pub(crate) const DEFAULT_RESET_STREAM_SECS: Seconds = Seconds(30);
910
pub(crate) const DEFAULT_CONNECTION_WINDOW_SIZE: WindowSize = 1_048_576;
1011
pub(crate) const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 48 * 1024;
1112
pub(crate) const DEFAULT_MAX_COUNTINUATIONS: usize = 5;

0 commit comments

Comments
 (0)