Skip to content

fix(iroh): always ping new paths, add test for reconnecting with changed addrs #3372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3216,4 +3216,97 @@ mod tests {

Ok(())
}

/// Test that we can immediately reconnect after respawning an endpoint with the same node id.
///
/// Importantly, this test does not use a relay. This means there is no other path but the UDP
/// path available. The respawned endpoint will have different direct addrs from the previous
/// endpoint. The test attempts to ensure that this works, i.e. that the server endpoint will
/// reply to the new addresses and not keep sending to the old UDP addresses which won't be
/// received anymore.
#[tokio::test]
#[traced_test]
async fn can_abort_and_reconnect() -> Result {
const TEST_ALPN: &[u8] = b"/iroh/test/1";
const TIMEOUT: Duration = Duration::from_secs(10);

let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);

// Spawn a server endpoint.
let server = Endpoint::builder()
.secret_key(SecretKey::generate(&mut rng))
.relay_mode(RelayMode::Disabled)
.alpns(vec![TEST_ALPN.to_vec()])
.bind()
.await?;
let server_addr = server.node_addr().initialized().await.e()?;

// The server accepts all connections, waits for them being closed, and sends the
// close code over a channel.
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let server_loop = tokio::task::spawn(async move {
while let Some(conn) = server.accept().await {
let conn = conn.accept().e()?.await.e()?;
info!("server ACCEPT");
let mut stream = conn.open_uni().await.e()?;
stream.write_all(b"hi").await.e()?;
stream.finish().e()?;
let res = match conn.closed().await {
ConnectionError::ApplicationClosed(frame) => Ok(u64::from(frame.error_code)),
reason => Err(reason),
};
info!("server CLOSED");
tx.send(res).await.e()?;
}
Result::<_, n0_snafu::Error>::Ok(())
});

// Clients connect to the server, and immediately close the connection with a code
// and then close the endpoint.
async fn connect(secret_key: SecretKey, addr: NodeAddr, code: u32) -> Result<Endpoint> {
info!("spawn client node {}", secret_key.public().fmt_short());
let ep = Endpoint::builder()
.secret_key(secret_key)
.relay_mode(RelayMode::Disabled)
.bind()
.await?;
let ipv4 = ep.bound_sockets()[0];
let node_id = ep.node_id().fmt_short();
info!(%node_id, %ipv4, "client CONNECT");
let conn = ep.connect(addr, TEST_ALPN).await?;
info!(%node_id, %ipv4, "client CONNECTED");
let mut stream = conn.accept_uni().await.e()?;
let buf = stream.read_to_end(2).await.e()?;
assert_eq!(&buf, b"hi");
conn.close(code.into(), b"bye");
info!(%node_id, %ipv4, "client CLOSE");
Ok(ep)
}

let client_secret_key = SecretKey::generate(&mut rng);

// First connection
let ep = n0_future::time::timeout(
TIMEOUT,
connect(client_secret_key.clone(), server_addr.clone(), 23),
)
.await
.e()??;
assert_eq!(rx.recv().await.unwrap().unwrap(), 23);
ep.close().await;

// Second connection
let ep = n0_future::time::timeout(
TIMEOUT,
connect(client_secret_key.clone(), server_addr.clone(), 24),
)
.await
.e()??;
assert_eq!(rx.recv().await.unwrap().unwrap(), 24);
ep.close().await;

server_loop.abort();

Ok(())
}
}
8 changes: 8 additions & 0 deletions iroh/src/magicsock/node_map/best_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,19 @@ impl BestAddr {
}
Some(state) => {
let candidate = AddrLatency { addr, latency };
// If the current address has exceeded its trust interval, or if the new address has lower latency,
// use the new address.
if !state.is_trusted(confirmed_at) || candidate.is_better_than(&state.addr) {
self.insert(addr, latency, source, confirmed_at);
// If the new address is equal to the current address, mark it as reconfirmed now and expand its trust
// interval.
} else if state.addr.addr == addr {
state.confirmed_at = confirmed_at;
state.trust_until = Some(source.trust_until(confirmed_at));
// If we receive a pong on a different port but the same IP address as the current best addr,
// we assume that the endpoint has rebound, and thus use the new port.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main issue with this logic is that I find it fairly arbitrary. It fixes an artificial test case, but it seems like a sub-set of the cases that need to be fixed. We're also not really sure we can send on this path at this point, but I'm not even sure if that's something we're always sure of anyway.

} else if state.addr.addr.ip() == addr.ip() {
self.insert(addr, latency, source, confirmed_at)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions iroh/src/magicsock/node_map/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,12 +789,13 @@ impl NodeState {
self.prune_direct_addresses();
}

// if the endpoint does not yet have a best_addrr
// if the endpoint does not yet have a best_addr, or if this is a new path
let needs_ping_back = if matches!(path, SendAddr::Udp(_))
&& matches!(
&& (matches!(
self.udp_paths.best_addr.state(now),
best_addr::State::Empty | best_addr::State::Outdated(_)
) {
) || matches!(role, PingRole::NewPath))
{
// We also need to send a ping to make this path available to us as well. This
// is always sent together with a pong. So in the worst case the pong gets lost
// and this ping does not. In that case we ping-pong until both sides have
Expand Down
Loading