|
3 | 3 | use std::{net::SocketAddr, sync::Arc};
|
4 | 4 |
|
5 | 5 | use anyhow::Result;
|
| 6 | +use n0_future::time::Duration; |
6 | 7 | use quinn::{crypto::rustls::QuicClientConfig, VarInt};
|
7 | 8 |
|
8 | 9 | /// ALPN for our quic addr discovery
|
@@ -212,6 +213,17 @@ impl QuicClient {
|
212 | 213 |
|
213 | 214 | // enable the receive side of address discovery
|
214 | 215 | let mut transport = quinn_proto::TransportConfig::default();
|
| 216 | + // Setting the initial RTT estimate to a low value means |
| 217 | + // we're sacrificing initial throughput, which is fine for |
| 218 | + // QAD, which doesn't require us to have good initial throughput. |
| 219 | + // It also implies a 999ms probe timeout, which means that |
| 220 | + // if the packet gets lots (e.g. because we're probing ipv6, but |
| 221 | + // ipv6 packets always get lost in our network configuration) we |
| 222 | + // time out *closing the connection* after only 999ms. |
| 223 | + // Even if the round trip time is bigger than 999ms, this doesn't |
| 224 | + // prevent us from connecting, since that's dependent on the idle |
| 225 | + // timeout (set to 30s by default). |
| 226 | + transport.initial_rtt(Duration::from_millis(111)); |
215 | 227 | transport.receive_observed_address_reports(true);
|
216 | 228 | client_config.transport_config(Arc::new(transport));
|
217 | 229 |
|
@@ -272,7 +284,13 @@ impl QuicClient {
|
272 | 284 | mod tests {
|
273 | 285 | use std::net::Ipv4Addr;
|
274 | 286 |
|
| 287 | + use anyhow::Context; |
| 288 | + use n0_future::{task::AbortOnDropHandle, time}; |
| 289 | + use quinn::crypto::rustls::QuicServerConfig; |
| 290 | + use tokio::time::Instant; |
| 291 | + use tracing::{debug, info, info_span, Instrument}; |
275 | 292 | use tracing_test::traced_test;
|
| 293 | + use webpki::types::PrivatePkcs8KeyDer; |
276 | 294 |
|
277 | 295 | use super::{
|
278 | 296 | server::{QuicConfig, QuicServer},
|
@@ -312,4 +330,133 @@ mod tests {
|
312 | 330 | assert_eq!(client_addr, addr);
|
313 | 331 | Ok(())
|
314 | 332 | }
|
| 333 | + |
| 334 | + #[tokio::test] |
| 335 | + #[traced_test] |
| 336 | + async fn test_qad_client_closes_unresponsive_fast() -> anyhow::Result<()> { |
| 337 | + // create a client-side endpoint |
| 338 | + let client_endpoint = |
| 339 | + quinn::Endpoint::client(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))?; |
| 340 | + |
| 341 | + // create an socket that does not respond. |
| 342 | + let server_socket = |
| 343 | + tokio::net::UdpSocket::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)).await?; |
| 344 | + let server_addr = server_socket.local_addr()?; |
| 345 | + |
| 346 | + // create the client configuration used for the client endpoint when they |
| 347 | + // initiate a connection with the server |
| 348 | + let client_config = crate::client::make_dangerous_client_config(); |
| 349 | + let quic_client = QuicClient::new(client_endpoint.clone(), client_config)?; |
| 350 | + |
| 351 | + // Start a connection attempt with nirvana - this will fail |
| 352 | + let task = AbortOnDropHandle::new(tokio::spawn({ |
| 353 | + async move { |
| 354 | + quic_client |
| 355 | + .get_addr_and_latency(server_addr, "localhost") |
| 356 | + .await |
| 357 | + } |
| 358 | + })); |
| 359 | + |
| 360 | + // Even if we wait longer than the probe timeout, we will still be attempting to connect: |
| 361 | + tokio::time::sleep(Duration::from_millis(1000)).await; |
| 362 | + assert!(!task.is_finished()); |
| 363 | + |
| 364 | + // time the closing of the client endpoint |
| 365 | + let before = Instant::now(); |
| 366 | + client_endpoint.close(0u32.into(), b"byeeeee"); |
| 367 | + client_endpoint.wait_idle().await; |
| 368 | + let time = Instant::now().duration_since(before); |
| 369 | + |
| 370 | + println!("Closed in {time:?}"); |
| 371 | + assert!(Duration::from_millis(900) < time); |
| 372 | + assert!(time < Duration::from_millis(1100)); |
| 373 | + |
| 374 | + Ok(()) |
| 375 | + } |
| 376 | + |
| 377 | + /// Makes sure that, even though the RTT was set to some fairly low value, |
| 378 | + /// we *do* try to connect for longer than what the time out would be after closing |
| 379 | + /// the connection, when we *don't* close the connection. |
| 380 | + /// |
| 381 | + /// In this case we don't simulate it via synthetically high RTT, but by dropping |
| 382 | + /// all packets on the server-side for 2 seconds. |
| 383 | + #[tokio::test] |
| 384 | + #[traced_test] |
| 385 | + async fn test_qad_connect_delayed() -> anyhow::Result<()> { |
| 386 | + // Create a socket for our QAD server. We need the socket separately because we |
| 387 | + // need to pop off messages before we attach it to the Quinn Endpoint. |
| 388 | + let socket = |
| 389 | + tokio::net::UdpSocket::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)).await?; |
| 390 | + let server_addr = socket.local_addr()?; |
| 391 | + info!(addr = ?server_addr, "server socket bound"); |
| 392 | + |
| 393 | + // Create a QAD server with a self-signed cert, all manually. |
| 394 | + let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?; |
| 395 | + let key = PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); |
| 396 | + let mut server_crypto = rustls::ServerConfig::builder() |
| 397 | + .with_no_client_auth() |
| 398 | + .with_single_cert(vec![cert.cert.into()], key.into())?; |
| 399 | + server_crypto.key_log = Arc::new(rustls::KeyLogFile::new()); |
| 400 | + server_crypto.alpn_protocols = vec![ALPN_QUIC_ADDR_DISC.to_vec()]; |
| 401 | + let mut server_config = |
| 402 | + quinn::ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(server_crypto)?)); |
| 403 | + let transport_config = Arc::get_mut(&mut server_config.transport).unwrap(); |
| 404 | + transport_config.send_observed_address_reports(true); |
| 405 | + |
| 406 | + let start = Instant::now(); |
| 407 | + let server_task = tokio::spawn( |
| 408 | + async move { |
| 409 | + info!("Dropping all packets"); |
| 410 | + time::timeout(Duration::from_secs(2), async { |
| 411 | + let mut buf = [0u8; 1500]; |
| 412 | + loop { |
| 413 | + let (len, src) = socket.recv_from(&mut buf).await.unwrap(); |
| 414 | + debug!(%len, ?src, "Dropped a packet"); |
| 415 | + } |
| 416 | + }) |
| 417 | + .await |
| 418 | + .ok(); |
| 419 | + info!("starting server"); |
| 420 | + let server = quinn::Endpoint::new( |
| 421 | + Default::default(), |
| 422 | + Some(server_config), |
| 423 | + socket.into_std()?, |
| 424 | + Arc::new(quinn::TokioRuntime), |
| 425 | + )?; |
| 426 | + info!("accepting conn"); |
| 427 | + let incoming = server.accept().await.context("no conn")?; |
| 428 | + info!("incoming!"); |
| 429 | + let conn = incoming.await?; |
| 430 | + conn.closed().await; |
| 431 | + server.wait_idle().await; |
| 432 | + Ok::<(), anyhow::Error>(()) |
| 433 | + } |
| 434 | + .instrument(info_span!("server")), |
| 435 | + ); |
| 436 | + let server_task = AbortOnDropHandle::new(server_task); |
| 437 | + |
| 438 | + info!("starting client"); |
| 439 | + let client_endpoint = |
| 440 | + quinn::Endpoint::client(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))?; |
| 441 | + |
| 442 | + // create the client configuration used for the client endpoint when they |
| 443 | + // initiate a connection with the server |
| 444 | + let client_config = crate::client::make_dangerous_client_config(); |
| 445 | + let quic_client = QuicClient::new(client_endpoint.clone(), client_config)?; |
| 446 | + |
| 447 | + // Now we should still connect, but it should take more than 1s. |
| 448 | + info!("making QAD request"); |
| 449 | + let (addr, latency) = time::timeout( |
| 450 | + Duration::from_secs(10), |
| 451 | + quic_client.get_addr_and_latency(server_addr, "localhost"), |
| 452 | + ) |
| 453 | + .await??; |
| 454 | + let duration = start.elapsed(); |
| 455 | + info!(?duration, ?addr, ?latency, "QAD succeeded"); |
| 456 | + assert!(duration >= Duration::from_secs(1)); |
| 457 | + |
| 458 | + time::timeout(Duration::from_secs(10), server_task).await???; |
| 459 | + |
| 460 | + Ok(()) |
| 461 | + } |
315 | 462 | }
|
0 commit comments