Skip to content

Commit da158ba

Browse files
authored
Better handling for stream reset (#55)
1 parent a78d933 commit da158ba

File tree

4 files changed

+55
-21
lines changed

4 files changed

+55
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Changes
22

3-
## [1.8.5] - 2025-02-11
3+
## [1.8.5] - 2025-02-12
44

55
* Fix handle for REFUSED_STREAM reset
66

src/dispatcher.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use ntex_util::{spawn, HashMap};
77

88
use crate::connection::{Connection, RecvHalfConnection};
99
use crate::control::{Control, ControlAck};
10-
use crate::error::{ConnectionError, OperationError, StreamErrorInner};
10+
use crate::error::{ConnectionError, OperationError, StreamError, StreamErrorInner};
1111
use crate::frame::{Frame, GoAway, Ping, Reason, Reset, StreamId};
1212
use crate::{codec::Codec, message::Message, stream::StreamRef};
1313

@@ -71,14 +71,18 @@ where
7171
}
7272
Err(Either::Right(err)) => {
7373
let (stream, kind) = err.into_inner();
74-
log::error!(
75-
"{}: Failed to handle message, err: {:?} stream: {:?}",
76-
stream.tag(),
77-
kind,
78-
stream
79-
);
8074

81-
stream.set_failed_stream(kind.into());
75+
if !matches!(kind, StreamError::Reset(_)) {
76+
log::error!(
77+
"{}: Failed to handle frame, err: {:?} stream: {:?}",
78+
stream.tag(),
79+
kind,
80+
stream
81+
);
82+
} else {
83+
stream.set_failed_stream(kind.into());
84+
}
85+
8286
self.connection
8387
.encode(Reset::new(stream.id(), kind.reason()));
8488
publish(Message::error(kind, &stream), stream, &self.inner, ctx).await

src/stream.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,7 @@ impl StreamState {
183183
self.review_state();
184184
}
185185

186-
fn set_failed(&self) {
187-
self.insert_flag(StreamFlags::FAILED);
188-
self.send_cap.wake();
189-
self.send_reset.wake();
190-
}
191-
192186
fn reset_stream(&self, reason: Option<Reason>) {
193-
self.set_failed();
194187
self.recv.set(HalfState::Closed(reason));
195188
self.send.set(HalfState::Closed(None));
196189
if let Some(reason) = reason {
@@ -200,22 +193,21 @@ impl StreamState {
200193
}
201194

202195
fn remote_reset_stream(&self, reason: Reason) {
203-
self.set_failed();
204196
self.recv.set(HalfState::Closed(None));
205197
self.send.set(HalfState::Closed(Some(reason)));
206198
self.error.set(Some(OperationError::RemoteReset(reason)));
207199
self.review_state();
208200
}
209201

210202
fn failed(&self, err: OperationError) {
211-
self.set_failed();
212203
if !self.recv.get().is_closed() {
213204
self.recv.set(HalfState::Closed(None));
214205
}
215206
if !self.send.get().is_closed() {
216207
self.send.set(HalfState::Closed(None));
217208
}
218209
self.error.set(Some(err));
210+
self.insert_flag(StreamFlags::FAILED);
219211
self.review_state();
220212
}
221213

@@ -246,11 +238,12 @@ impl StreamState {
246238

247239
if let HalfState::Closed(reason) = self.send.get() {
248240
// stream is closed
249-
if reason.is_some() {
241+
if let Some(reason) = reason {
250242
log::trace!(
251-
"{}: {:?} is closed with local reset, dropping stream",
243+
"{}: {:?} is closed with remote reset {:?}, dropping stream",
252244
self.tag(),
253-
self.id
245+
self.id,
246+
reason
254247
);
255248
} else {
256249
log::trace!(
@@ -259,6 +252,7 @@ impl StreamState {
259252
self.id
260253
);
261254
}
255+
self.send_cap.wake();
262256
self.con.drop_stream(self.id);
263257
}
264258
}

tests/connection.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,42 @@ async fn test_goaway_on_overflow() {
326326
assert!(io.recv(&codec).await.unwrap().is_none());
327327
}
328328

329+
#[ntex::test]
330+
async fn test_stream_cancel() {
331+
let srv = start_server();
332+
let addr = srv.addr();
333+
334+
let io = connect(addr).await;
335+
let codec = Codec::default();
336+
let _ = io.with_write_buf(|buf| buf.extend_from_slice(&PREFACE));
337+
338+
let settings = frame::Settings::default();
339+
io.encode(settings.into(), &codec).unwrap();
340+
341+
// settings & window
342+
let _ = io.recv(&codec).await;
343+
let _ = io.recv(&codec).await;
344+
let _ = io.recv(&codec).await;
345+
346+
let id = frame::StreamId::CLIENT;
347+
let pseudo = frame::PseudoHeaders {
348+
method: Some(Method::GET),
349+
scheme: Some("HTTPS".into()),
350+
authority: Some("localhost".into()),
351+
path: Some("/".into()),
352+
..Default::default()
353+
};
354+
355+
let hdrs = frame::Headers::new(id, pseudo.clone(), HeaderMap::new(), false);
356+
io.send(hdrs.into(), &codec).await.unwrap();
357+
io.send(frame::Reset::new(id, frame::Reason::CANCEL).into(), &codec)
358+
.await
359+
.unwrap();
360+
361+
let reset = get_reset(io.recv(&codec).await.unwrap().unwrap());
362+
assert!(reset.reason() == frame::Reason::CANCEL);
363+
}
364+
329365
#[ntex::test]
330366
async fn test_goaway_on_reset() {
331367
let srv = start_server();

0 commit comments

Comments
 (0)