Skip to content

Commit f74b7c9

Browse files
committed
Implement RACK
This is a modified version of [RACK] for QUIC. * initial timeout at `9/8 RTT` instead of `5/4 RTT` to keep the behavior the same when no out-of-order packets arrive * reorder_window_mult adds fractions of `1/8 RTT` instead of `1/4 RTT` for no particular reason (except that it make the code more consistent to the initial timeout of `9/8 RTT` * out of order packets, that weren't causing spurious retransmits still reset the `reorder_window_persist` to 16. TCP doesn't have the necessary information to do this * `reorder_window_mult` is set high enough to prevent a spurious retransmit next time instead of just increasing by one. Can be done, because we have more RTT estimates for packets, that would have been spuriously retransmitted in TCP. [RACK]: https://datatracker.ietf.org/doc/html/rfc8985
1 parent b0eb12a commit f74b7c9

File tree

6 files changed

+113
-14
lines changed

6 files changed

+113
-14
lines changed

neqo-transport/src/cc/classic_cc.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,12 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
152152
}
153153

154154
// Multi-packet version of OnPacketAckedCC
155-
fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
155+
fn on_packets_acked(
156+
&mut self,
157+
acked_pkts: &[SentPacket],
158+
min_rtt: Duration,
159+
now: Instant,
160+
) -> bool {
156161
// Check whether we are app limited before acked packets are removed
157162
// from bytes_in_flight.
158163
let is_app_limited = self.app_limited();
@@ -166,6 +171,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
166171
MAX_DATAGRAM_SIZE * PACING_BURST_SIZE,
167172
);
168173

174+
let mut exiting_recovery = false;
169175
let mut new_acked = 0;
170176
for pkt in acked_pkts {
171177
qinfo!(
@@ -190,6 +196,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
190196

191197
if self.state.in_recovery() {
192198
self.set_state(State::CongestionAvoidance);
199+
exiting_recovery = true;
193200
qlog::metrics_updated(&mut self.qlog, &[QlogMetric::InRecovery(false)]);
194201
}
195202

@@ -199,7 +206,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
199206
if is_app_limited {
200207
self.cc_algorithm.on_app_limited();
201208
qinfo!("on_packets_acked this={:p}, limited=1, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
202-
return;
209+
return exiting_recovery;
203210
}
204211

205212
// Slow start, up to the slow start threshold.
@@ -250,6 +257,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
250257
],
251258
);
252259
qinfo!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
260+
exiting_recovery
253261
}
254262

255263
/// Update congestion controller state based on lost packets.

neqo-transport/src/cc/mod.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ pub trait CongestionControl: Display + Debug {
4040
#[must_use]
4141
fn cwnd_avail(&self) -> usize;
4242

43-
fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant);
43+
fn on_packets_acked(
44+
&mut self,
45+
acked_pkts: &[SentPacket],
46+
min_rtt: Duration,
47+
now: Instant,
48+
) -> bool;
4449

4550
/// Returns true if the congestion window was reduced.
4651
fn on_packets_lost(

neqo-transport/src/path.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -957,10 +957,10 @@ impl Path {
957957
}
958958

959959
/// Record packets as acknowledged with the sender.
960-
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) {
960+
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) -> bool {
961961
debug_assert!(self.is_primary());
962962
self.sender
963-
.on_packets_acked(acked_pkts, self.rtt.minimum(), now);
963+
.on_packets_acked(acked_pkts, self.rtt.minimum(), now)
964964
}
965965

966966
/// Record packets as lost with the sender.

neqo-transport/src/recovery.rs

+78-7
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@ pub(crate) struct LossRecoverySpace {
185185
/// This is `None` if there were no out-of-order packets detected.
186186
/// When set to `Some(T)`, time-based loss detection should be enabled.
187187
first_ooo_time: Option<Instant>,
188+
/// If no reordering has been observed, TODO: just say reo_wnd_mult != 0
189+
reordering_seen: bool,
190+
/// the RTO is RTT * (reo_wnd_mult + 9) / 8
191+
///
192+
/// this is basically the index of the first non-zero entry of `reo_wnd_persist`
193+
reorder_window_mult: u32,
194+
reorder_window_persist: u32,
188195
}
189196

190197
impl LossRecoverySpace {
@@ -197,6 +204,9 @@ impl LossRecoverySpace {
197204
in_flight_outstanding: 0,
198205
sent_packets: BTreeMap::default(),
199206
first_ooo_time: None,
207+
reorder_window_mult: 0,
208+
reorder_window_persist: 0,
209+
reordering_seen: false,
200210
}
201211
}
202212

@@ -384,18 +394,20 @@ impl LossRecoverySpace {
384394
pub fn detect_lost_packets(
385395
&mut self,
386396
now: Instant,
387-
loss_delay: Duration,
397+
rtt_estimate: Duration,
388398
cleanup_delay: Duration,
389399
lost_packets: &mut Vec<SentPacket>,
390400
) {
391401
// Housekeeping.
392402
self.remove_old_lost(now, cleanup_delay);
393403

404+
let loss_delay = rtt_estimate * (self.reorder_window_mult + 9) / 8;
394405
qtrace!(
395-
"detect lost {}: now={:?} delay={:?}",
406+
"detect lost {}: now={:?} delay={:?}, multiplier={}",
396407
self.space,
397408
now,
398409
loss_delay,
410+
self.reorder_window_mult
399411
);
400412
self.first_ooo_time = None;
401413

@@ -418,7 +430,7 @@ impl LossRecoverySpace {
418430
packet.time_sent,
419431
loss_delay
420432
);
421-
} else if largest_acked >= Some(*pn + PACKET_THRESHOLD) {
433+
} else if !self.reordering_seen && largest_acked >= Some(*pn + PACKET_THRESHOLD) {
422434
qtrace!(
423435
"lost={}, is >= {} from largest acked {:?}",
424436
pn,
@@ -438,6 +450,60 @@ impl LossRecoverySpace {
438450

439451
lost_packets.extend(lost_pns.iter().map(|pn| self.sent_packets[pn].clone()));
440452
}
453+
454+
pub fn detect_reordered_packets(
455+
&mut self,
456+
now: Instant,
457+
acked_pkts: &[SentPacket],
458+
rtt_estimate: Duration,
459+
) {
460+
// detect packet reordering
461+
let mut max_rtt = Duration::default();
462+
if let Some(largest_ack) = self.largest_acked {
463+
for pkt in acked_pkts
464+
.iter()
465+
.filter(|pkt| pkt.cc_in_flight() && pkt.pn < largest_ack)
466+
{
467+
qinfo!("detect_reordered_packets largest_ack={}, pn={}", largest_ack, pkt.pn);
468+
// reordering event
469+
self.reordering_seen = true;
470+
max_rtt = max(max_rtt, now.duration_since(pkt.time_sent));
471+
}
472+
}
473+
// update reo_wnd
474+
if max_rtt > rtt_estimate && !rtt_estimate.is_zero() {
475+
// calculate reo_wnd necessary to accept the reordering event
476+
// inverse of
477+
// lost_delay = rtt_estimate * (self.reo_wnd_mult + 9) / 8;
478+
// <=> self.reo_wnd_mult = (lost_delay / rtt_estimate) * 8 - 9
479+
let multiplier = min(
480+
(max_rtt.as_micros() * 8 / rtt_estimate.as_micros()) - 9 + 1,
481+
8,
482+
);
483+
let multiplier = u32::try_from(multiplier).unwrap();
484+
qinfo!(
485+
"detect_reordered_packets max_rtt={}, rtt_estimate={} old_barrier={}, new_barrier={}",
486+
max_rtt.as_micros(),
487+
rtt_estimate.as_micros(),
488+
(rtt_estimate * (self.reorder_window_mult + 9) / 8).as_micros(),
489+
(rtt_estimate * (multiplier + 9) / 8).as_micros()
490+
);
491+
self.reorder_window_mult = max(self.reorder_window_mult, multiplier);
492+
}
493+
}
494+
495+
pub fn on_exiting_recovery(&mut self) {
496+
if self.reorder_window_persist != 0 {
497+
self.reorder_window_persist -= 1;
498+
if self.reorder_window_persist == 0 {
499+
self.reorder_window_mult = 0;
500+
}
501+
}
502+
qinfo!(
503+
"on_exiting_recovery reorder_window_persist={}, reorder_window_mult={}",
504+
self.reorder_window_persist, self.reorder_window_mult
505+
);
506+
}
441507
}
442508

443509
#[derive(Debug)]
@@ -680,6 +746,9 @@ impl LossRecovery {
680746
return (Vec::new(), Vec::new());
681747
}
682748

749+
let rtt_estimate = primary_path.borrow().rtt().estimated_upper();
750+
space.detect_reordered_packets(now, &acked_packets, rtt_estimate);
751+
683752
// Track largest PN acked per space
684753
let prev_largest_acked = space.largest_acked_sent_time;
685754
if Some(largest_acked) > space.largest_acked {
@@ -704,12 +773,11 @@ impl LossRecovery {
704773
// We need to ensure that we have sent any PTO probes before they are removed
705774
// as we rely on the count of in-flight packets to determine whether to send
706775
// another probe. Removing them too soon would result in not sending on PTO.
707-
let loss_delay = primary_path.borrow().rtt().loss_delay();
708776
let cleanup_delay = self.pto_period(primary_path.borrow().rtt(), pn_space);
709777
let mut lost = Vec::new();
710778
self.spaces.get_mut(pn_space).unwrap().detect_lost_packets(
711779
now,
712-
loss_delay,
780+
rtt_estimate,
713781
cleanup_delay,
714782
&mut lost,
715783
);
@@ -725,9 +793,12 @@ impl LossRecovery {
725793
// This must happen after on_packets_lost. If in recovery, this could
726794
// take us out, and then lost packets will start a new recovery period
727795
// when it shouldn't.
728-
primary_path
796+
if primary_path
729797
.borrow_mut()
730-
.on_packets_acked(&acked_packets, now);
798+
.on_packets_acked(&acked_packets, now)
799+
{
800+
self.spaces.get_mut(pn_space).unwrap().on_exiting_recovery();
801+
}
731802

732803
self.pto_state = None;
733804

neqo-transport/src/rtt.rs

+10
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,16 @@ impl RttEstimate {
146146
max(rtt * 9 / 8, GRANULARITY)
147147
}
148148

149+
/// Frin RFC9002 Section 6.1.2 Time Treshhold
150+
/// Using `max(smoothed_rtt, latest_rtt)` protects from the two following cases:
151+
// * the latest RTT sample is lower than the smoothed RTT, perhaps due to reordering where the
152+
// acknowledgment encountered a shorter path;
153+
// * the latest RTT sample is higher than the smoothed RTT, perhaps due to a sustained
154+
// increase in the actual RTT, but the smoothed RTT has not yet caught up.
155+
pub fn estimated_upper(&self) -> Duration {
156+
max(self.latest_rtt, self.smoothed_rtt)
157+
}
158+
149159
pub fn first_sample_time(&self) -> Option<Instant> {
150160
self.first_sample_time
151161
}

neqo-transport/src/sender.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,13 @@ impl PacketSender {
6363
self.cc.cwnd_avail()
6464
}
6565

66-
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
67-
self.cc.on_packets_acked(acked_pkts, min_rtt, now);
66+
pub fn on_packets_acked(
67+
&mut self,
68+
acked_pkts: &[SentPacket],
69+
min_rtt: Duration,
70+
now: Instant,
71+
) -> bool {
72+
self.cc.on_packets_acked(acked_pkts, min_rtt, now)
6873
}
6974

7075
/// Called when packets are lost. Returns true if the congestion window was reduced.

0 commit comments

Comments
 (0)