Skip to content

Commit 3e7b9d1

Browse files
authored
Better handling for delay reset queue (#50)
1 parent 5575d92 commit 3e7b9d1

File tree

4 files changed

+67
-70
lines changed

4 files changed

+67
-70
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## [1.8.2] - 2025-02-08
44

5+
* Better handling for delay reset queue
56
* Fix connection level window size handling
67

78
## [1.8.1] - 2025-01-31

src/config.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@ pub struct Config(pub(crate) Rc<ConfigInner>);
1919

2020
/// Http2 connection configuration
2121
pub(crate) struct ConfigInner {
22+
pub(crate) settings: Cell<Settings>,
2223
/// Initial window size of locally initiated streams
2324
pub(crate) window_sz: Cell<WindowSize>,
2425
pub(crate) window_sz_threshold: Cell<WindowSize>,
2526
/// How long a locally reset stream should ignore frames
2627
pub(crate) reset_duration: Cell<Duration>,
27-
/// Maximum number of locally reset streams to keep at a time
28-
pub(crate) reset_max: Cell<usize>,
29-
pub(crate) settings: Cell<Settings>,
3028
/// Initial window size for new connections.
3129
pub(crate) connection_window_sz: Cell<WindowSize>,
3230
pub(crate) connection_window_sz_threshold: Cell<WindowSize>,
@@ -91,7 +89,6 @@ impl Config {
9189
connection_window_sz_threshold,
9290
dispatcher_config,
9391
settings: Cell::new(settings),
94-
reset_max: Cell::new(consts::DEFAULT_RESET_STREAM_MAX),
9592
reset_duration: Cell::new(consts::DEFAULT_RESET_STREAM_SECS.into()),
9693
remote_max_concurrent_streams: Cell::new(None),
9794
max_header_continuations: Cell::new(consts::DEFAULT_MAX_COUNTINUATIONS),
@@ -236,8 +233,9 @@ impl Config {
236233
/// error, forcing the connection to terminate.
237234
///
238235
/// The default value is 30.
239-
pub fn max_concurrent_reset_streams(&self, max: usize) -> &Self {
240-
self.0.reset_max.set(max);
236+
#[doc(hidden)]
237+
#[deprecated]
238+
pub fn max_concurrent_reset_streams(&self, _: usize) -> &Self {
241239
self
242240
}
243241

@@ -357,7 +355,6 @@ impl fmt::Debug for Config {
357355
.field("window_sz", &self.0.window_sz.get())
358356
.field("window_sz_threshold", &self.0.window_sz_threshold.get())
359357
.field("reset_duration", &self.0.reset_duration.get())
360-
.field("reset_max", &self.0.reset_max.get())
361358
.field("connection_window_sz", &self.0.connection_window_sz.get())
362359
.field(
363360
"connection_window_sz_threshold",
@@ -378,7 +375,6 @@ impl fmt::Debug for ConfigInner {
378375
.field("window_sz", &self.window_sz.get())
379376
.field("window_sz_threshold", &self.window_sz_threshold.get())
380377
.field("reset_duration", &self.reset_duration.get())
381-
.field("reset_max", &self.reset_max.get())
382378
.field("connection_window_sz", &self.connection_window_sz.get())
383379
.field(
384380
"connection_window_sz_threshold",

src/connection.rs

Lines changed: 62 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use std::{cell::Cell, cell::RefCell, fmt, mem, rc::Rc};
2-
use std::{collections::VecDeque, time::Instant};
2+
use std::{collections::VecDeque, time::Duration, time::Instant};
33

44
use ntex_bytes::{ByteString, Bytes};
55
use ntex_http::{HeaderMap, Method};
66
use ntex_io::IoRef;
77
use ntex_util::time::{self, now, sleep};
8-
use ntex_util::{channel::pool, future::Either, spawn, HashMap, HashSet};
8+
use ntex_util::{channel::pool, future::Either, spawn, HashMap};
99

1010
use crate::config::{Config, ConfigInner};
1111
use crate::error::{ConnectionError, OperationError, StreamError, StreamErrorInner};
@@ -22,7 +22,6 @@ bitflags::bitflags! {
2222
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
2323
pub(crate) struct ConnectionFlags: u8 {
2424
const SETTINGS_PROCESSED = 0b0000_0001;
25-
const DELAY_DROP_TASK_STARTED = 0b0000_0010;
2625
const DISCONNECT_WHEN_READY = 0b0000_1000;
2726
const SECURE = 0b0001_0000;
2827
const STREAM_REFUSED = 0b0010_0000;
@@ -55,8 +54,7 @@ struct ConnectionState {
5554
// Max frame size
5655
remote_frame_size: Cell<u32>,
5756
// Locally reset streams
58-
local_reset_queue: RefCell<VecDeque<(StreamId, Instant)>>,
59-
local_reset_ids: RefCell<HashSet<StreamId>>,
57+
local_pending_reset: Pending,
6058
// protocol level error
6159
error: Cell<Option<OperationError>>,
6260
// connection state flags
@@ -112,8 +110,7 @@ impl Connection {
112110
next_stream_id: Cell::new(StreamId::new(1)),
113111
local_config: config,
114112
local_max_concurrent_streams: Cell::new(None),
115-
local_reset_ids: RefCell::new(HashSet::default()),
116-
local_reset_queue: RefCell::new(VecDeque::new()),
113+
local_pending_reset: Default::default(),
117114
remote_window_sz: Cell::new(frame::DEFAULT_INITIAL_WINDOW_SIZE),
118115
error: Cell::new(None),
119116
flags: Cell::new(if secure {
@@ -410,20 +407,8 @@ impl Connection {
410407
return;
411408
}
412409

413-
let mut ids = self.0.local_reset_ids.borrow_mut();
414-
let mut queue = self.0.local_reset_queue.borrow_mut();
415-
416-
// check queue size
417-
if queue.len() >= self.0.local_config.0.reset_max.get() {
418-
if let Some((id, _)) = queue.pop_front() {
419-
ids.remove(&id);
420-
}
421-
}
422-
ids.insert(id);
423-
queue.push_back((id, now() + self.0.local_config.0.reset_duration.get()));
424-
if !flags.contains(ConnectionFlags::DELAY_DROP_TASK_STARTED) {
425-
let _ = spawn(delay_drop_task(self.clone()));
426-
}
410+
// Add ids to pending queue
411+
self.0.local_pending_reset.add(id);
427412
}
428413

429414
pub(crate) fn recv_half(&self) -> RecvHalfConnection {
@@ -482,7 +467,8 @@ impl RecvHalfConnection {
482467
Ok(item) => Ok(item.map(move |msg| (stream, msg))),
483468
Err(kind) => Err(Either::Right(StreamErrorInner::new(stream, kind))),
484469
}
485-
} else if self.0.local_reset_ids.borrow().contains(&id) {
470+
} else if !self.0.local_config.is_server() && self.0.local_pending_reset.is_pending(id) {
471+
// if client and no stream, then it was closed
486472
self.encode(frame::Reset::new(id, frame::Reason::STREAM_CLOSED));
487473
Ok(None)
488474
} else if id < self.0.next_stream_id.get() {
@@ -562,7 +548,7 @@ impl RecvHalfConnection {
562548
Ok(item) => Ok(item.map(move |msg| (stream, msg))),
563549
Err(kind) => Err(Either::Right(StreamErrorInner::new(stream, kind))),
564550
}
565-
} else if self.0.local_reset_ids.borrow().contains(&frm.stream_id()) {
551+
} else if self.0.local_pending_reset.is_pending(frm.stream_id()) {
566552
self.encode(frame::Reset::new(
567553
frm.stream_id(),
568554
frame::Reason::STREAM_CLOSED,
@@ -701,7 +687,7 @@ impl RecvHalfConnection {
701687
stream
702688
.recv_window_update(frm)
703689
.map_err(|kind| Either::Right(StreamErrorInner::new(stream, kind)))
704-
} else if self.0.local_reset_ids.borrow().contains(&frm.stream_id()) {
690+
} else if self.0.local_pending_reset.is_pending(frm.stream_id()) {
705691
Ok(())
706692
} else {
707693
log::trace!("Unknown WINDOW_UPDATE {:?}", frm);
@@ -738,7 +724,7 @@ impl RecvHalfConnection {
738724
stream,
739725
StreamError::Reset(frm.reason()),
740726
)))
741-
} else if self.0.local_reset_ids.borrow().contains(&frm.stream_id()) {
727+
} else if self.0.local_pending_reset.is_pending(frm.stream_id()) {
742728
self.update_rst_count()
743729
} else {
744730
self.update_rst_count()?;
@@ -851,42 +837,6 @@ impl fmt::Debug for Connection {
851837
}
852838
}
853839

854-
async fn delay_drop_task(state: Connection) {
855-
state.set_flags(ConnectionFlags::DELAY_DROP_TASK_STARTED);
856-
857-
loop {
858-
let next = if let Some(item) = state.0.local_reset_queue.borrow().front() {
859-
item.1 - now()
860-
} else {
861-
break;
862-
};
863-
sleep(next).await;
864-
865-
if state.is_closed() {
866-
return;
867-
}
868-
869-
let now = now();
870-
let mut ids = state.0.local_reset_ids.borrow_mut();
871-
let mut queue = state.0.local_reset_queue.borrow_mut();
872-
loop {
873-
if let Some(item) = queue.front() {
874-
if item.1 <= now {
875-
log::trace!("{}: dropping {:?} after delay", state.tag(), item.0);
876-
ids.remove(&item.0);
877-
queue.pop_front();
878-
} else {
879-
break;
880-
}
881-
} else {
882-
state.unset_flags(ConnectionFlags::DELAY_DROP_TASK_STARTED);
883-
return;
884-
}
885-
}
886-
}
887-
state.unset_flags(ConnectionFlags::DELAY_DROP_TASK_STARTED);
888-
}
889-
890840
async fn ping(st: Connection, timeout: time::Seconds, io: IoRef) {
891841
log::debug!("start http client ping/pong task");
892842

@@ -917,3 +867,54 @@ async fn ping(st: Connection, timeout: time::Seconds, io: IoRef) {
917867
st.0.pings_count.set(st.0.pings_count.get() + 1);
918868
}
919869
}
870+
871+
const BLOCKS: usize = 6;
872+
const BLOCK_SIZE: Duration = Duration::from_secs(30);
873+
const LAST_BLOCK: usize = 5;
874+
875+
#[derive(Default)]
876+
struct Pending {
877+
times: RefCell<[Option<(StreamId, Instant)>; BLOCKS]>,
878+
}
879+
880+
impl Pending {
881+
fn add(&self, id: StreamId) {
882+
let cur = now();
883+
let mut times = self.times.borrow_mut();
884+
885+
if let Some(item) = &times[0] {
886+
// check if we need to insert new block
887+
if item.1 < (cur - BLOCK_SIZE) {
888+
// shift blocks
889+
let mut i = LAST_BLOCK - 1;
890+
loop {
891+
times[i + 1] = times[i];
892+
if i == 0 {
893+
break;
894+
}
895+
i -= 1;
896+
}
897+
// insert new item
898+
times[0] = Some((id, cur));
899+
}
900+
} else {
901+
// insert new item
902+
times[0] = Some((id, cur));
903+
}
904+
}
905+
906+
fn is_pending(&self, id: StreamId) -> bool {
907+
let times = self.times.borrow();
908+
let mut idx = LAST_BLOCK;
909+
loop {
910+
if let Some(item) = &times[idx] {
911+
return id >= item.0;
912+
} else if idx == 0 {
913+
break;
914+
} else {
915+
idx -= 1;
916+
}
917+
}
918+
false
919+
}
920+
}

src/consts.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use crate::frame::WindowSize;
55

66
// Constants
77
pub(crate) const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
8-
pub(crate) const DEFAULT_RESET_STREAM_MAX: usize = 30;
98
pub(crate) const DEFAULT_RESET_STREAM_SECS: Seconds = Seconds(10);
109
pub(crate) const DEFAULT_CONNECTION_WINDOW_SIZE: WindowSize = 1_048_576;
1110
pub(crate) const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 48 * 1024;

0 commit comments

Comments
 (0)