diff --git a/CHANGELOG.md b/CHANGELOG.md index 179e368..139df1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changes -## [1.8.5] - 2025-02-11 +## [1.8.5] - 2025-02-12 * Fix handle for REFUSED_STREAM reset diff --git a/src/dispatcher.rs b/src/dispatcher.rs index acfc602..177b47c 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -7,7 +7,7 @@ use ntex_util::{spawn, HashMap}; use crate::connection::{Connection, RecvHalfConnection}; use crate::control::{Control, ControlAck}; -use crate::error::{ConnectionError, OperationError, StreamErrorInner}; +use crate::error::{ConnectionError, OperationError, StreamError, StreamErrorInner}; use crate::frame::{Frame, GoAway, Ping, Reason, Reset, StreamId}; use crate::{codec::Codec, message::Message, stream::StreamRef}; @@ -71,14 +71,18 @@ where } Err(Either::Right(err)) => { let (stream, kind) = err.into_inner(); - log::error!( - "{}: Failed to handle message, err: {:?} stream: {:?}", - stream.tag(), - kind, - stream - ); - stream.set_failed_stream(kind.into()); + if !matches!(kind, StreamError::Reset(_)) { + log::error!( + "{}: Failed to handle frame, err: {:?} stream: {:?}", + stream.tag(), + kind, + stream + ); + } else { + stream.set_failed_stream(kind.into()); + } + self.connection .encode(Reset::new(stream.id(), kind.reason())); publish(Message::error(kind, &stream), stream, &self.inner, ctx).await diff --git a/src/stream.rs b/src/stream.rs index 68acabc..4117123 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -183,14 +183,7 @@ impl StreamState { self.review_state(); } - fn set_failed(&self) { - self.insert_flag(StreamFlags::FAILED); - self.send_cap.wake(); - self.send_reset.wake(); - } - fn reset_stream(&self, reason: Option) { - self.set_failed(); self.recv.set(HalfState::Closed(reason)); self.send.set(HalfState::Closed(None)); if let Some(reason) = reason { @@ -200,7 +193,6 @@ impl StreamState { } fn remote_reset_stream(&self, reason: Reason) { - self.set_failed(); self.recv.set(HalfState::Closed(None)); self.send.set(HalfState::Closed(Some(reason))); self.error.set(Some(OperationError::RemoteReset(reason))); @@ -208,7 +200,6 @@ impl StreamState { } fn failed(&self, err: OperationError) { - self.set_failed(); if !self.recv.get().is_closed() { self.recv.set(HalfState::Closed(None)); } @@ -216,6 +207,7 @@ impl StreamState { self.send.set(HalfState::Closed(None)); } self.error.set(Some(err)); + self.insert_flag(StreamFlags::FAILED); self.review_state(); } @@ -246,11 +238,12 @@ impl StreamState { if let HalfState::Closed(reason) = self.send.get() { // stream is closed - if reason.is_some() { + if let Some(reason) = reason { log::trace!( - "{}: {:?} is closed with local reset, dropping stream", + "{}: {:?} is closed with remote reset {:?}, dropping stream", self.tag(), - self.id + self.id, + reason ); } else { log::trace!( @@ -259,6 +252,7 @@ impl StreamState { self.id ); } + self.send_cap.wake(); self.con.drop_stream(self.id); } } diff --git a/tests/connection.rs b/tests/connection.rs index 1e801c1..35f30c9 100644 --- a/tests/connection.rs +++ b/tests/connection.rs @@ -326,6 +326,42 @@ async fn test_goaway_on_overflow() { assert!(io.recv(&codec).await.unwrap().is_none()); } +#[ntex::test] +async fn test_stream_cancel() { + let srv = start_server(); + let addr = srv.addr(); + + let io = connect(addr).await; + let codec = Codec::default(); + let _ = io.with_write_buf(|buf| buf.extend_from_slice(&PREFACE)); + + let settings = frame::Settings::default(); + io.encode(settings.into(), &codec).unwrap(); + + // settings & window + let _ = io.recv(&codec).await; + let _ = io.recv(&codec).await; + let _ = io.recv(&codec).await; + + let id = frame::StreamId::CLIENT; + let pseudo = frame::PseudoHeaders { + method: Some(Method::GET), + scheme: Some("HTTPS".into()), + authority: Some("localhost".into()), + path: Some("/".into()), + ..Default::default() + }; + + let hdrs = frame::Headers::new(id, pseudo.clone(), HeaderMap::new(), false); + io.send(hdrs.into(), &codec).await.unwrap(); + io.send(frame::Reset::new(id, frame::Reason::CANCEL).into(), &codec) + .await + .unwrap(); + + let reset = get_reset(io.recv(&codec).await.unwrap().unwrap()); + assert!(reset.reason() == frame::Reason::CANCEL); +} + #[ntex::test] async fn test_goaway_on_reset() { let srv = start_server();