Skip to content

Commit 865f2bc

Browse files
authored
Allow to skip frames for unknown streams (#59)
1 parent 40987fa commit 865f2bc

File tree

6 files changed

+75
-19
lines changed

6 files changed

+75
-19
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [1.11.0] - 2025-07-01
4+
5+
* Allow to skip frames for unknown streams
6+
37
## [1.10.0] - 2025-06-29
48

59
* Generate unique id for simple client

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-h2"
3-
version = "1.10.0"
3+
version = "1.11.0"
44
license = "MIT OR Apache-2.0"
55
authors = ["Nikolay Kim <[email protected]>"]
66
description = "An HTTP/2 client and server"
@@ -24,12 +24,12 @@ features = []
2424

2525
[dependencies]
2626
ntex-net = "2"
27-
ntex-io = "2.9"
27+
ntex-io = "2.13"
2828
ntex-http = "0.1"
2929
ntex-bytes = "0.1"
3030
ntex-codec = "0.6"
31-
ntex-service = "3.4"
32-
ntex-util = "2.8"
31+
ntex-service = "3.5"
32+
ntex-util = "2.12"
3333

3434
bitflags = "2"
3535
fxhash = "0.2"

src/client/pool.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ impl Client {
175175
inner.config.clone(),
176176
inner.scheme.clone(),
177177
inner.authority.clone(),
178+
inner.skip_unknown_streams,
178179
storage,
179180
);
180181
inner.connections.borrow_mut().push(client);
@@ -273,6 +274,7 @@ struct Inner {
273274
conn_lifetime: Duration,
274275
disconnect_timeout: Millis,
275276
max_streams: u32,
277+
skip_unknown_streams: bool,
276278
scheme: Scheme,
277279
config: crate::Config,
278280
authority: ByteString,
@@ -311,6 +313,7 @@ impl ClientBuilder {
311313
conn_lifetime: Duration::from_secs(0),
312314
disconnect_timeout: Millis(15_000),
313315
max_streams: 100,
316+
skip_unknown_streams: false,
314317
minconn: 1,
315318
maxconn: 16,
316319
scheme: Scheme::HTTP,
@@ -358,6 +361,14 @@ impl ClientBuilder {
358361
self
359362
}
360363

364+
/// Do not return error for frames for unknown streams.
365+
///
366+
/// This includes pending resets, data and window update frames.
367+
pub fn skip_unknown_streams(mut self) -> Self {
368+
self.0.skip_unknown_streams = true;
369+
self
370+
}
371+
361372
/// Set max lifetime period for connection.
362373
///
363374
/// Connection lifetime is max lifetime of any opened connection

src/client/simple.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ impl SimpleClient {
3636
config,
3737
scheme,
3838
authority,
39+
false,
3940
InflightStorage::default(),
4041
)
4142
}
@@ -45,10 +46,11 @@ impl SimpleClient {
4546
config: Config,
4647
scheme: Scheme,
4748
authority: ByteString,
49+
skip_unknown_streams: bool,
4850
storage: InflightStorage,
4951
) -> Self {
5052
let codec = Codec::default();
51-
let con = Connection::new(io.get_ref(), codec, config, false);
53+
let con = Connection::new(io.get_ref(), codec, config, false, skip_unknown_streams);
5254
con.set_secure(scheme == Scheme::HTTPS);
5355

5456
let disp = Dispatcher::new(

src/connection.rs

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ bitflags::bitflags! {
2222
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
2323
pub(crate) struct ConnectionFlags: u8 {
2424
const SETTINGS_PROCESSED = 0b0000_0001;
25+
const UNKNOWN_STREAMS = 0b0000_0010;
2526
const DISCONNECT_WHEN_READY = 0b0000_1000;
2627
const SECURE = 0b0001_0000;
2728
const STREAM_REFUSED = 0b0010_0000;
@@ -62,7 +63,13 @@ struct ConnectionState {
6263
}
6364

6465
impl Connection {
65-
pub(crate) fn new(io: IoRef, codec: Codec, config: Config, secure: bool) -> Self {
66+
pub(crate) fn new(
67+
io: IoRef,
68+
codec: Codec,
69+
config: Config,
70+
secure: bool,
71+
skip_streams: bool,
72+
) -> Self {
6673
// send preface
6774
if !config.is_server() {
6875
let _ = io.with_write_buf(|buf| buf.extend_from_slice(&consts::PREFACE));
@@ -94,6 +101,15 @@ impl Connection {
94101

95102
let remote_frame_size = Cell::new(codec.send_frame_size());
96103

104+
let mut flags = if secure {
105+
ConnectionFlags::SECURE
106+
} else {
107+
ConnectionFlags::empty()
108+
};
109+
if !skip_streams {
110+
flags.insert(ConnectionFlags::UNKNOWN_STREAMS);
111+
}
112+
97113
let state = Rc::new(ConnectionState {
98114
codec,
99115
remote_frame_size,
@@ -113,11 +129,7 @@ impl Connection {
113129
local_pending_reset: Default::default(),
114130
remote_window_sz: Cell::new(frame::DEFAULT_INITIAL_WINDOW_SIZE),
115131
error: Cell::new(None),
116-
flags: Cell::new(if secure {
117-
ConnectionFlags::SECURE
118-
} else {
119-
ConnectionFlags::empty()
120-
}),
132+
flags: Cell::new(flags),
121133
});
122134
let con = Connection(state);
123135

@@ -408,7 +420,9 @@ impl Connection {
408420
}
409421

410422
// Add ids to pending queue
411-
self.0.local_pending_reset.add(id, &self.0.local_config);
423+
if flags.contains(ConnectionFlags::UNKNOWN_STREAMS) {
424+
self.0.local_pending_reset.add(id, &self.0.local_config);
425+
}
412426
}
413427

414428
pub(crate) fn recv_half(&self) -> RecvHalfConnection {
@@ -420,6 +434,12 @@ impl Connection {
420434
}
421435
}
422436

437+
impl ConnectionState {
438+
fn err_unknown_streams(&self) -> bool {
439+
self.flags.get().contains(ConnectionFlags::UNKNOWN_STREAMS)
440+
}
441+
}
442+
423443
impl RecvHalfConnection {
424444
pub(crate) fn tag(&self) -> &'static str {
425445
self.0.io.tag()
@@ -455,8 +475,9 @@ impl RecvHalfConnection {
455475
frm: Headers,
456476
) -> Result<Option<(StreamRef, Message)>, Either<ConnectionError, StreamErrorInner>> {
457477
let id = frm.stream_id();
478+
let is_server = self.0.local_config.is_server();
458479

459-
if self.0.local_config.is_server() && !id.is_client_initiated() {
480+
if is_server && !id.is_client_initiated() {
460481
return Err(Either::Left(ConnectionError::InvalidStreamId(
461482
"Invalid id in received headers frame",
462483
)));
@@ -467,7 +488,9 @@ impl RecvHalfConnection {
467488
Ok(item) => Ok(item.map(move |msg| (stream, msg))),
468489
Err(kind) => Err(Either::Right(StreamErrorInner::new(stream, kind))),
469490
}
470-
} else if !self.0.local_config.is_server() && self.0.local_pending_reset.is_pending(id) {
491+
} else if !is_server
492+
&& (!self.0.err_unknown_streams() || self.0.local_pending_reset.is_pending(id))
493+
{
471494
// if client and no stream, then it was closed
472495
self.encode(frame::Reset::new(id, frame::Reason::STREAM_CLOSED));
473496
Ok(None)
@@ -543,7 +566,9 @@ impl RecvHalfConnection {
543566
Ok(item) => Ok(item.map(move |msg| (stream, msg))),
544567
Err(kind) => Err(Either::Right(StreamErrorInner::new(stream, kind))),
545568
}
546-
} else if self.0.local_pending_reset.is_pending(frm.stream_id()) {
569+
} else if !self.0.err_unknown_streams()
570+
|| self.0.local_pending_reset.is_pending(frm.stream_id())
571+
{
547572
self.encode(frame::Reset::new(
548573
frm.stream_id(),
549574
frame::Reason::STREAM_CLOSED,
@@ -684,11 +709,17 @@ impl RecvHalfConnection {
684709
.map_err(|kind| Either::Right(StreamErrorInner::new(stream, kind)))
685710
} else if self.0.local_pending_reset.is_pending(frm.stream_id()) {
686711
Ok(())
687-
} else {
712+
} else if self.0.err_unknown_streams() {
688713
log::trace!("Unknown WINDOW_UPDATE {:?}", frm);
689714
Err(Either::Left(ConnectionError::UnknownStream(
690715
"WINDOW_UPDATE",
691716
)))
717+
} else {
718+
self.encode(frame::Reset::new(
719+
frm.stream_id(),
720+
frame::Reason::STREAM_CLOSED,
721+
));
722+
Ok(())
692723
}
693724
}
694725

@@ -724,9 +755,11 @@ impl RecvHalfConnection {
724755
)))
725756
} else if self.0.local_pending_reset.remove(id) {
726757
self.update_rst_count()
727-
} else {
758+
} else if self.0.err_unknown_streams() {
728759
self.update_rst_count()?;
729760
Err(Either::Left(ConnectionError::UnknownStream("RST_STREAM")))
761+
} else {
762+
Ok(())
730763
}
731764
}
732765

src/server/service.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,13 @@ where
136136

137137
// create h2 codec
138138
let codec = Codec::default();
139-
let con = Connection::new(io.get_ref(), codec.clone(), inner.config.clone(), true);
139+
let con = Connection::new(
140+
io.get_ref(),
141+
codec.clone(),
142+
inner.config.clone(),
143+
true,
144+
false,
145+
);
140146
let con2 = con.clone();
141147

142148
// start protocol dispatcher
@@ -249,7 +255,7 @@ where
249255

250256
// create h2 codec
251257
let codec = Codec::default();
252-
let con = Connection::new(io.get_ref(), codec.clone(), config.clone(), true);
258+
let con = Connection::new(io.get_ref(), codec.clone(), config.clone(), true, false);
253259
let con2 = con.clone();
254260

255261
// start protocol dispatcher

0 commit comments

Comments
 (0)