Skip to content

Commit a4ac438

Browse files
committed
Implement RACK
This is a modified version of [RACK] for QUIC. [RACK]: https://datatracker.ietf.org/doc/html/rfc8985
1 parent 887d256 commit a4ac438

File tree

6 files changed

+124
-14
lines changed

6 files changed

+124
-14
lines changed

neqo-transport/src/cc/classic_cc.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,12 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
149149
}
150150

151151
// Multi-packet version of OnPacketAckedCC
152-
fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
152+
fn on_packets_acked(
153+
&mut self,
154+
acked_pkts: &[SentPacket],
155+
min_rtt: Duration,
156+
now: Instant,
157+
) -> bool {
153158
// Check whether we are app limited before acked packets are removed
154159
// from bytes_in_flight.
155160
let is_app_limited = self.app_limited();
@@ -163,6 +168,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
163168
MAX_DATAGRAM_SIZE * PACING_BURST_SIZE,
164169
);
165170

171+
let mut exiting_recovery = false;
166172
let mut new_acked = 0;
167173
for pkt in acked_pkts {
168174
qinfo!(
@@ -187,6 +193,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
187193

188194
if self.state.in_recovery() {
189195
self.set_state(State::CongestionAvoidance);
196+
exiting_recovery = true;
190197
qlog::metrics_updated(&mut self.qlog, &[QlogMetric::InRecovery(false)]);
191198
}
192199

@@ -196,7 +203,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
196203
if is_app_limited {
197204
self.cc_algorithm.on_app_limited();
198205
qinfo!("on_packets_acked this={:p}, limited=1, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
199-
return;
206+
return exiting_recovery;
200207
}
201208

202209
// Slow start, up to the slow start threshold.
@@ -247,6 +254,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
247254
],
248255
);
249256
qinfo!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
257+
exiting_recovery
250258
}
251259

252260
/// Update congestion controller state based on lost packets.

neqo-transport/src/cc/mod.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ pub trait CongestionControl: Display + Debug {
4040
#[must_use]
4141
fn cwnd_avail(&self) -> usize;
4242

43-
fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant);
43+
fn on_packets_acked(
44+
&mut self,
45+
acked_pkts: &[SentPacket],
46+
min_rtt: Duration,
47+
now: Instant,
48+
) -> bool;
4449

4550
/// Returns true if the congestion window was reduced.
4651
fn on_packets_lost(

neqo-transport/src/path.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -957,10 +957,10 @@ impl Path {
957957
}
958958

959959
/// Record packets as acknowledged with the sender.
960-
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) {
960+
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) -> bool {
961961
debug_assert!(self.is_primary());
962962
self.sender
963-
.on_packets_acked(acked_pkts, self.rtt.minimum(), now);
963+
.on_packets_acked(acked_pkts, self.rtt.minimum(), now)
964964
}
965965

966966
/// Record packets as lost with the sender.

neqo-transport/src/recovery.rs

+89-7
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@ pub(crate) struct LossRecoverySpace {
185185
/// This is `None` if there were no out-of-order packets detected.
186186
/// When set to `Some(T)`, time-based loss detection should be enabled.
187187
first_ooo_time: Option<Instant>,
188+
/// If no reordering has been observed, TODO: just say reo_wnd_mult != 0
189+
reordering_seen: bool,
190+
/// the RTO is RTT * (reo_wnd_mult + 9) / 8
191+
///
192+
/// this is basically the index of the first non-zero entry of `reo_wnd_persist`
193+
reo_wnd_mult: u32,
194+
reo_wnd_persist: [u8; 16],
188195
}
189196

190197
impl LossRecoverySpace {
@@ -197,6 +204,9 @@ impl LossRecoverySpace {
197204
in_flight_outstanding: 0,
198205
sent_packets: BTreeMap::default(),
199206
first_ooo_time: None,
207+
reo_wnd_mult: 0,
208+
reo_wnd_persist: Default::default(),
209+
reordering_seen: false,
200210
}
201211
}
202212

@@ -384,18 +394,20 @@ impl LossRecoverySpace {
384394
pub fn detect_lost_packets(
385395
&mut self,
386396
now: Instant,
387-
loss_delay: Duration,
397+
rtt_estimate: Duration,
388398
cleanup_delay: Duration,
389399
lost_packets: &mut Vec<SentPacket>,
390400
) {
391401
// Housekeeping.
392402
self.remove_old_lost(now, cleanup_delay);
393403

404+
let loss_delay = rtt_estimate * (self.reo_wnd_mult + 9) / 8;
394405
qtrace!(
395-
"detect lost {}: now={:?} delay={:?}",
406+
"detect lost {}: now={:?} delay={:?}, multiplier={}",
396407
self.space,
397408
now,
398409
loss_delay,
410+
self.reo_wnd_mult
399411
);
400412
self.first_ooo_time = None;
401413

@@ -418,7 +430,7 @@ impl LossRecoverySpace {
418430
packet.time_sent,
419431
loss_delay
420432
);
421-
} else if largest_acked >= Some(*pn + PACKET_THRESHOLD) {
433+
} else if !self.reordering_seen && largest_acked >= Some(*pn + PACKET_THRESHOLD) {
422434
qtrace!(
423435
"lost={}, is >= {} from largest acked {:?}",
424436
pn,
@@ -438,6 +450,71 @@ impl LossRecoverySpace {
438450

439451
lost_packets.extend(lost_pns.iter().map(|pn| self.sent_packets[pn].clone()));
440452
}
453+
454+
pub fn detect_reordered_packets(
455+
&mut self,
456+
now: Instant,
457+
acked_pkts: &[SentPacket],
458+
rtt_estimate: Duration,
459+
) {
460+
// detect packet reordering
461+
let mut max_rtt = Duration::default();
462+
if let Some(largest_ack) = self.largest_acked {
463+
for pkt in acked_pkts
464+
.iter()
465+
.filter(|pkt| pkt.cc_in_flight() && pkt.pn < largest_ack)
466+
{
467+
println!("largest_ack={}, pn={}", largest_ack, pkt.pn);
468+
// reordering event
469+
self.reordering_seen = true;
470+
max_rtt = max(max_rtt, now.duration_since(pkt.time_sent));
471+
}
472+
}
473+
// update reo_wnd
474+
if max_rtt > rtt_estimate && !rtt_estimate.is_zero() {
475+
// calculate reo_wnd necessary to accept the reordering event
476+
477+
// inverse of lost_delay = rtt_estimate * (self.reo_wnd_mult + 9) / 8;
478+
// self.reo_wnd_mult = (lost_delay / rtt_estimate) * 8 - 9
479+
let new_reo_wnd = min(
480+
(max_rtt.as_micros() * 8 / rtt_estimate.as_micros()) - 9 + 1,
481+
self.reo_wnd_persist.len() as u128,
482+
);
483+
let new_reo_wnd = usize::try_from(new_reo_wnd).unwrap();
484+
for el in 0..new_reo_wnd {
485+
self.reo_wnd_persist[el] = 16;
486+
}
487+
println!(
488+
"max_rtt={}, rtt_estimate={} old_barrier={}, new_barrier={}",
489+
max_rtt.as_millis(),
490+
rtt_estimate.as_millis(),
491+
(rtt_estimate * (self.reo_wnd_mult + 9) / 8).as_millis(),
492+
(rtt_estimate * (new_reo_wnd as u32 + 9) / 8).as_millis()
493+
);
494+
println!(
495+
"detect_reordered_packets old={}, new={}, reo_wnd_persist={:?}",
496+
self.reo_wnd_mult,
497+
(max_rtt.as_micros() * 8 / rtt_estimate.as_micros()) - 9 + 1,
498+
self.reo_wnd_persist
499+
);
500+
self.reo_wnd_mult = max(self.reo_wnd_mult, u32::try_from(new_reo_wnd).unwrap());
501+
}
502+
}
503+
504+
pub fn on_exiting_recovery(&mut self) {
505+
let old = self.reo_wnd_mult;
506+
for (i, el) in self.reo_wnd_persist.iter_mut().enumerate() {
507+
if *el == 0 {
508+
self.reo_wnd_mult = u32::try_from(i).unwrap();
509+
break;
510+
}
511+
*el = el.saturating_sub(1);
512+
}
513+
println!(
514+
"detect_lost_packets old={}, new={}, reo_wnd_persist={:?}",
515+
old, self.reo_wnd_mult, self.reo_wnd_persist
516+
);
517+
}
441518
}
442519

443520
#[derive(Debug)]
@@ -680,6 +757,9 @@ impl LossRecovery {
680757
return (Vec::new(), Vec::new());
681758
}
682759

760+
let rtt_estimate = primary_path.borrow().rtt().estimated_upper();
761+
space.detect_reordered_packets(now, &acked_packets, rtt_estimate);
762+
683763
// Track largest PN acked per space
684764
let prev_largest_acked = space.largest_acked_sent_time;
685765
if Some(largest_acked) > space.largest_acked {
@@ -704,12 +784,11 @@ impl LossRecovery {
704784
// We need to ensure that we have sent any PTO probes before they are removed
705785
// as we rely on the count of in-flight packets to determine whether to send
706786
// another probe. Removing them too soon would result in not sending on PTO.
707-
let loss_delay = primary_path.borrow().rtt().loss_delay();
708787
let cleanup_delay = self.pto_period(primary_path.borrow().rtt(), pn_space);
709788
let mut lost = Vec::new();
710789
self.spaces.get_mut(pn_space).unwrap().detect_lost_packets(
711790
now,
712-
loss_delay,
791+
rtt_estimate,
713792
cleanup_delay,
714793
&mut lost,
715794
);
@@ -725,9 +804,12 @@ impl LossRecovery {
725804
// This must happen after on_packets_lost. If in recovery, this could
726805
// take us out, and then lost packets will start a new recovery period
727806
// when it shouldn't.
728-
primary_path
807+
if primary_path
729808
.borrow_mut()
730-
.on_packets_acked(&acked_packets, now);
809+
.on_packets_acked(&acked_packets, now)
810+
{
811+
self.spaces.get_mut(pn_space).unwrap().on_exiting_recovery();
812+
}
731813

732814
self.pto_state = None;
733815

neqo-transport/src/rtt.rs

+10
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,16 @@ impl RttEstimate {
146146
max(rtt * 9 / 8, GRANULARITY)
147147
}
148148

149+
/// Frin RFC9002 Section 6.1.2 Time Treshhold
150+
/// Using max(smoothed_rtt, latest_rtt) protects from the two following cases:
151+
// * the latest RTT sample is lower than the smoothed RTT, perhaps due to reordering where the
152+
// acknowledgment encountered a shorter path;
153+
// * the latest RTT sample is higher than the smoothed RTT, perhaps due to a sustained
154+
// increase in the actual RTT, but the smoothed RTT has not yet caught up.
155+
pub fn estimated_upper(&self) -> Duration {
156+
max(self.latest_rtt, self.smoothed_rtt)
157+
}
158+
149159
pub fn first_sample_time(&self) -> Option<Instant> {
150160
self.first_sample_time
151161
}

neqo-transport/src/sender.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,13 @@ impl PacketSender {
6363
self.cc.cwnd_avail()
6464
}
6565

66-
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
67-
self.cc.on_packets_acked(acked_pkts, min_rtt, now);
66+
pub fn on_packets_acked(
67+
&mut self,
68+
acked_pkts: &[SentPacket],
69+
min_rtt: Duration,
70+
now: Instant,
71+
) -> bool {
72+
self.cc.on_packets_acked(acked_pkts, min_rtt, now)
6873
}
6974

7075
/// Called when packets are lost. Returns true if the congestion window was reduced.

0 commit comments

Comments
 (0)