Skip to content

Commit 7420a99

Browse files
committed
Handle implicit resets at the right time
A stream whose ref count reaches zero while open should not immediately decrease the number of active streams, otherwise MAX_CONCURRENT_STREAMS isn't respected anymore.
1 parent d7c56f4 commit 7420a99

File tree

3 files changed

+165
-22
lines changed

3 files changed

+165
-22
lines changed

src/proto/streams/prioritize.rs

+1
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,7 @@ impl Prioritize {
839839
}),
840840
None => {
841841
if let Some(reason) = stream.state.get_scheduled_reset() {
842+
stream.state.did_schedule_reset();
842843
stream.set_reset(reason, Initiator::Library);
843844

844845
let frame = frame::Reset::new(stream.id, reason);

src/proto/streams/state.rs

+27-22
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,20 @@ enum Inner {
5858
// TODO: these states shouldn't count against concurrency limits:
5959
ReservedLocal,
6060
ReservedRemote,
61-
Open { local: Peer, remote: Peer },
61+
Open {
62+
local: Peer,
63+
remote: Peer,
64+
},
6265
HalfClosedLocal(Peer), // TODO: explicitly name this value
6366
HalfClosedRemote(Peer),
67+
/// This indicates to the connection that a reset frame must be sent out
68+
/// once the send queue has been flushed.
69+
///
70+
/// Examples of when this could happen:
71+
/// - User drops all references to a stream, so we want to CANCEL the it.
72+
/// - Header block size was too large, so we want to REFUSE, possibly
73+
/// after sending a 431 response frame.
74+
ScheduledReset(Reason),
6475
Closed(Cause),
6576
}
6677

@@ -75,15 +86,6 @@ enum Peer {
7586
enum Cause {
7687
EndStream,
7788
Error(Error),
78-
79-
/// This indicates to the connection that a reset frame must be sent out
80-
/// once the send queue has been flushed.
81-
///
82-
/// Examples of when this could happen:
83-
/// - User drops all references to a stream, so we want to CANCEL the it.
84-
/// - Header block size was too large, so we want to REFUSE, possibly
85-
/// after sending a 431 response frame.
86-
ScheduledLibraryReset(Reason),
8789
}
8890

8991
impl State {
@@ -339,24 +341,29 @@ impl State {
339341
/// Set the stream state to a scheduled reset.
340342
pub fn set_scheduled_reset(&mut self, reason: Reason) {
341343
debug_assert!(!self.is_closed());
342-
self.inner = Closed(Cause::ScheduledLibraryReset(reason));
344+
self.inner = ScheduledReset(reason)
343345
}
344346

345347
pub fn get_scheduled_reset(&self) -> Option<Reason> {
346348
match self.inner {
347-
Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason),
349+
ScheduledReset(reason) => Some(reason),
348350
_ => None,
349351
}
350352
}
351353

352354
pub fn is_scheduled_reset(&self) -> bool {
353-
matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..)))
355+
matches!(self.inner, ScheduledReset(_))
356+
}
357+
358+
pub fn did_schedule_reset(&mut self) {
359+
debug_assert!(self.is_scheduled_reset());
360+
self.inner = Closed(Cause::EndStream);
354361
}
355362

356363
pub fn is_local_error(&self) -> bool {
357364
match self.inner {
365+
ScheduledReset(_) => true,
358366
Closed(Cause::Error(ref e)) => e.is_local(),
359-
Closed(Cause::ScheduledLibraryReset(..)) => true,
360367
_ => false,
361368
}
362369
}
@@ -416,14 +423,14 @@ impl State {
416423
pub fn is_recv_closed(&self) -> bool {
417424
matches!(
418425
self.inner,
419-
Closed(..) | HalfClosedRemote(..) | ReservedLocal
426+
ScheduledReset(_) | Closed(..) | HalfClosedRemote(..) | ReservedLocal
420427
)
421428
}
422429

423430
pub fn is_send_closed(&self) -> bool {
424431
matches!(
425432
self.inner,
426-
Closed(..) | HalfClosedLocal(..) | ReservedRemote
433+
ScheduledReset(_) | Closed(..) | HalfClosedLocal(..) | ReservedRemote
427434
)
428435
}
429436

@@ -434,10 +441,8 @@ impl State {
434441
pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
435442
// TODO: Is this correct?
436443
match self.inner {
444+
ScheduledReset(reason) => Err(proto::Error::library_go_away(reason)),
437445
Closed(Cause::Error(ref e)) => Err(e.clone()),
438-
Closed(Cause::ScheduledLibraryReset(reason)) => {
439-
Err(proto::Error::library_go_away(reason))
440-
}
441446
Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
442447
_ => Ok(true),
443448
}
@@ -446,9 +451,9 @@ impl State {
446451
/// Returns a reason if the stream has been reset.
447452
pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
448453
match self.inner {
449-
Closed(Cause::Error(Error::Reset(_, reason, _)))
450-
| Closed(Cause::Error(Error::GoAway(_, reason, _)))
451-
| Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
454+
ScheduledReset(reason)
455+
| Closed(Cause::Error(Error::Reset(_, reason, _)))
456+
| Closed(Cause::Error(Error::GoAway(_, reason, _))) => Ok(Some(reason)),
452457
Closed(Cause::Error(ref e)) => Err(e.clone().into()),
453458
Open {
454459
local: Streaming, ..

tests/h2-tests/tests/stream_states.rs

+137
Original file line numberDiff line numberDiff line change
@@ -1218,3 +1218,140 @@ async fn reset_new_stream_before_send() {
12181218

12191219
join(srv, client).await;
12201220
}
1221+
1222+
#[tokio::test]
1223+
async fn explicit_reset_with_max_concurrent_stream() {
1224+
h2_support::trace_init!();
1225+
1226+
let (io, mut srv) = mock::new();
1227+
1228+
let mock = async move {
1229+
let settings = srv
1230+
.assert_client_handshake_with_settings(frames::settings().max_concurrent_streams(1))
1231+
.await;
1232+
assert_default_settings!(settings);
1233+
1234+
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
1235+
.await;
1236+
srv.send_frame(frames::headers(1).response(200)).await;
1237+
1238+
srv.recv_frame(frames::reset(1).cancel()).await;
1239+
1240+
srv.recv_frame(
1241+
frames::headers(3)
1242+
.request("POST", "https://www.example.com/")
1243+
.eos(),
1244+
)
1245+
.await;
1246+
srv.send_frame(frames::headers(3).response(200)).await;
1247+
};
1248+
1249+
let h2 = async move {
1250+
let (mut client, mut h2) = client::handshake(io).await.unwrap();
1251+
1252+
{
1253+
let request = Request::builder()
1254+
.method(Method::POST)
1255+
.uri("https://www.example.com/")
1256+
.body(())
1257+
.unwrap();
1258+
1259+
let (resp, mut stream) = client.send_request(request, false).unwrap();
1260+
1261+
{
1262+
let resp = h2.drive(resp).await.unwrap();
1263+
assert_eq!(resp.status(), StatusCode::OK);
1264+
}
1265+
1266+
stream.send_reset(Reason::CANCEL);
1267+
};
1268+
1269+
{
1270+
let request = Request::builder()
1271+
.method(Method::POST)
1272+
.uri("https://www.example.com/")
1273+
.body(())
1274+
.unwrap();
1275+
1276+
let (resp, _) = client.send_request(request, true).unwrap();
1277+
1278+
{
1279+
let resp = h2.drive(resp).await.unwrap();
1280+
assert_eq!(resp.status(), StatusCode::OK);
1281+
}
1282+
};
1283+
1284+
h2.await.unwrap();
1285+
};
1286+
1287+
join(mock, h2).await;
1288+
}
1289+
1290+
#[tokio::test]
1291+
async fn implicit_cancel_with_max_concurrent_stream() {
1292+
h2_support::trace_init!();
1293+
1294+
let (io, mut srv) = mock::new();
1295+
1296+
let mock = async move {
1297+
let settings = srv
1298+
.assert_client_handshake_with_settings(frames::settings().max_concurrent_streams(1))
1299+
.await;
1300+
assert_default_settings!(settings);
1301+
1302+
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
1303+
.await;
1304+
srv.send_frame(frames::headers(1).response(200)).await;
1305+
1306+
srv.recv_frame(frames::reset(1).cancel()).await;
1307+
1308+
srv.recv_frame(
1309+
frames::headers(3)
1310+
.request("POST", "https://www.example.com/")
1311+
.eos(),
1312+
)
1313+
.await;
1314+
srv.send_frame(frames::headers(3).response(200)).await;
1315+
};
1316+
1317+
let h2 = async move {
1318+
let (mut client, mut h2) = client::handshake(io).await.unwrap();
1319+
1320+
{
1321+
let request = Request::builder()
1322+
.method(Method::POST)
1323+
.uri("https://www.example.com/")
1324+
.body(())
1325+
.unwrap();
1326+
1327+
let (resp, stream) = client.send_request(request, false).unwrap();
1328+
1329+
{
1330+
let resp = h2.drive(resp).await.unwrap();
1331+
assert_eq!(resp.status(), StatusCode::OK);
1332+
}
1333+
1334+
// This implicitly resets the stream with CANCEL.
1335+
drop(stream);
1336+
};
1337+
1338+
{
1339+
let request = Request::builder()
1340+
.method(Method::POST)
1341+
.uri("https://www.example.com/")
1342+
.body(())
1343+
.unwrap();
1344+
1345+
let (resp, _) = client.send_request(request, true).unwrap();
1346+
1347+
{
1348+
let resp = h2.drive(resp).await.unwrap();
1349+
assert_eq!(resp.status(), StatusCode::OK);
1350+
}
1351+
};
1352+
1353+
h2.await.unwrap();
1354+
};
1355+
1356+
join(mock, h2).await;
1357+
}

0 commit comments

Comments
 (0)