From 54e6b66d4292ad0b38ac479b13c3a96776d23d08 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 1 Feb 2025 00:05:11 +0100 Subject: [PATCH 1/4] feat(iroh): allow for limiting incoming connections on the router --- README.md | 3 +- iroh/src/protocol.rs | 133 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 128 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 1798c8fa55..8ed2ec9ee8 100644 --- a/README.md +++ b/README.md @@ -94,9 +94,8 @@ let router = Router::builder(endpoint) struct Echo; impl ProtocolHandler for Echo { - fn accept(self: Arc, connecting: Connecting) -> BoxedFuture> { + fn accept(&self, connection: Connection) -> BoxedFuture> { Box::pin(async move { - let connection = connecting.await?; let (mut send, mut recv) = connection.accept_bi().await?; // Echo any bytes received back directly. diff --git a/iroh/src/protocol.rs b/iroh/src/protocol.rs index 574ec68918..47d37174f2 100644 --- a/iroh/src/protocol.rs +++ b/iroh/src/protocol.rs @@ -41,6 +41,7 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::Result; +use iroh_base::NodeId; use n0_future::{ boxed::BoxFuture, join_all, @@ -50,7 +51,10 @@ use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use tracing::{error, info_span, trace, warn, Instrument}; -use crate::{endpoint::Connecting, Endpoint}; +use crate::{ + endpoint::{Connecting, Connection}, + Endpoint, +}; /// The built router. /// @@ -109,10 +113,18 @@ pub struct RouterBuilder { /// The protocol handler must then be registered on the node for an ALPN protocol with /// [`crate::protocol::RouterBuilder::accept`]. pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static { + /// Optional interception point to handle the `Connecting` state. + fn on_connecting(&self, conn: Connecting) -> BoxFuture> { + Box::pin(async move { + let conn = conn.await?; + Ok(conn) + }) + } + /// Handle an incoming connection. /// /// This runs on a freshly spawned tokio task so this can be long-running. - fn accept(&self, conn: Connecting) -> BoxFuture>; + fn accept(&self, conn: Connection) -> BoxFuture>; /// Called when the node shuts down. fn shutdown(&self) -> BoxFuture<()> { @@ -121,7 +133,10 @@ pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static { } impl ProtocolHandler for Arc { - fn accept(&self, conn: Connecting) -> BoxFuture> { + fn on_connecting(&self, conn: Connecting) -> BoxFuture> { + self.as_ref().on_connecting(conn) + } + fn accept(&self, conn: Connection) -> BoxFuture> { self.as_ref().accept(conn) } @@ -131,7 +146,10 @@ impl ProtocolHandler for Arc { } impl ProtocolHandler for Box { - fn accept(&self, conn: Connecting) -> BoxFuture> { + fn on_connecting(&self, conn: Connecting) -> BoxFuture> { + self.as_ref().on_connecting(conn) + } + fn accept(&self, conn: Connection) -> BoxFuture> { self.as_ref().accept(conn) } @@ -350,8 +368,62 @@ async fn handle_connection(incoming: crate::endpoint::Incoming, protocols: Arc

{ + if let Err(err) = handler.accept(connection).await { + warn!("Handling incoming connection ended with error: {err}"); + } + } + Err(err) => { + warn!("Handling incoming connecting ended with error: {err}"); + } + } +} + +/// Limit +#[derive(derive_more::Debug, Clone)] +pub struct AccessLimit { + proto: P, + #[debug("limiter")] + limiter: Arc bool + Send + Sync + 'static>, +} + +impl AccessLimit

{ + /// Create a new `AccessLimit`. + pub fn new(proto: P, limiter: F) -> Self + where + F: Fn(NodeId) -> bool + Send + Sync + 'static, + { + Self { + proto, + limiter: Arc::new(limiter), + } + } +} + +impl ProtocolHandler for AccessLimit

{ + fn on_connecting(&self, conn: Connecting) -> BoxFuture> { + self.proto.on_connecting(conn) + } + + fn accept(&self, conn: Connection) -> BoxFuture> { + println!("accepting limiter"); + let this = self.clone(); + Box::pin(async move { + let remote = conn.remote_node_id()?; + let is_allowed = (this.limiter)(remote); + dbg!(is_allowed); + if !is_allowed { + conn.close(0u32.into(), b"not allowed"); + anyhow::bail!("not allowed"); + } + this.proto.accept(conn).await?; + Ok(()) + }) + } + + fn shutdown(&self) -> BoxFuture<()> { + self.proto.shutdown() } } @@ -374,4 +446,53 @@ mod tests { Ok(()) } + + // The protocol definition: + #[derive(Debug, Clone)] + struct Echo; + + const ECHO_ALPN: &[u8] = b"/iroh/echo/1"; + + impl ProtocolHandler for Echo { + fn accept(&self, connection: Connection) -> BoxFuture> { + println!("accepting echo"); + Box::pin(async move { + let (mut send, mut recv) = connection.accept_bi().await?; + + // Echo any bytes received back directly. + let _bytes_sent = tokio::io::copy(&mut recv, &mut send).await?; + + send.finish()?; + connection.closed().await; + + Ok(()) + }) + } + } + #[tokio::test] + async fn test_limiter() -> Result<()> { + let e1 = Endpoint::builder().bind().await?; + // deny all access + let proto = AccessLimit::new(Echo, |_node_id| false); + let r1 = Router::builder(e1.clone()) + .accept(ECHO_ALPN, proto) + .spawn() + .await?; + + let addr1 = r1.endpoint().node_addr().await?; + + let e2 = Endpoint::builder().bind().await?; + + println!("connecting"); + let conn = e2.connect(addr1, ECHO_ALPN).await?; + + let (_send, mut recv) = conn.open_bi().await?; + let response = recv.read_to_end(1000).await.unwrap_err(); + assert!(format!("{:#?}", response).contains("not allowed")); + + r1.shutdown().await?; + e2.close().await; + + Ok(()) + } } From 97239a653ea7cf33dbb1e482497d1a48050fdf5d Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 10 Feb 2025 17:30:03 +0100 Subject: [PATCH 2/4] cleanup --- iroh/examples/echo.rs | 6 ++---- iroh/examples/search.rs | 6 ++---- iroh/src/protocol.rs | 13 +++++++------ 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/iroh/examples/echo.rs b/iroh/examples/echo.rs index 5da09096f0..8056672df7 100644 --- a/iroh/examples/echo.rs +++ b/iroh/examples/echo.rs @@ -8,7 +8,7 @@ use anyhow::Result; use iroh::{ - endpoint::Connecting, + endpoint::Connection, protocol::{ProtocolHandler, Router}, Endpoint, NodeAddr, }; @@ -71,11 +71,9 @@ impl ProtocolHandler for Echo { /// /// The returned future runs on a newly spawned tokio task, so it can run as long as /// the connection lasts. - fn accept(&self, connecting: Connecting) -> BoxFuture> { + fn accept(&self, connection: Connection) -> BoxFuture> { // We have to return a boxed future from the handler. Box::pin(async move { - // Wait for the connection to be fully established. - let connection = connecting.await?; // We can get the remote's node id from the connection. let node_id = connection.remote_node_id()?; println!("accepted connection from {node_id}"); diff --git a/iroh/examples/search.rs b/iroh/examples/search.rs index 1cd265dfa8..a715e590e4 100644 --- a/iroh/examples/search.rs +++ b/iroh/examples/search.rs @@ -34,7 +34,7 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::Result; use clap::Parser; use iroh::{ - endpoint::Connecting, + endpoint::Connection, protocol::{ProtocolHandler, Router}, Endpoint, NodeId, }; @@ -127,12 +127,10 @@ impl ProtocolHandler for BlobSearch { /// /// The returned future runs on a newly spawned tokio task, so it can run as long as /// the connection lasts. - fn accept(&self, connecting: Connecting) -> BoxFuture> { + fn accept(&self, connection: Connection) -> BoxFuture> { let this = self.clone(); // We have to return a boxed future from the handler. Box::pin(async move { - // Wait for the connection to be fully established. - let connection = connecting.await?; // We can get the remote's node id from the connection. let node_id = connection.remote_node_id()?; println!("accepted connection from {node_id}"); diff --git a/iroh/src/protocol.rs b/iroh/src/protocol.rs index 47d37174f2..19b1923975 100644 --- a/iroh/src/protocol.rs +++ b/iroh/src/protocol.rs @@ -5,7 +5,7 @@ //! ```no_run //! # use anyhow::Result; //! # use futures_lite::future::Boxed as BoxedFuture; -//! # use iroh::{endpoint::Connecting, protocol::{ProtocolHandler, Router}, Endpoint, NodeAddr}; +//! # use iroh::{endpoint::Connection, protocol::{ProtocolHandler, Router}, Endpoint, NodeAddr}; //! # //! # async fn test_compile() -> Result<()> { //! let endpoint = Endpoint::builder().discovery_n0().bind().await?; @@ -22,9 +22,8 @@ //! struct Echo; //! //! impl ProtocolHandler for Echo { -//! fn accept(&self, connecting: Connecting) -> BoxedFuture> { +//! fn accept(&self, connection: Connection) -> BoxedFuture> { //! Box::pin(async move { -//! let connection = connecting.await?; //! let (mut send, mut recv) = connection.accept_bi().await?; //! //! // Echo any bytes received back directly. @@ -114,9 +113,9 @@ pub struct RouterBuilder { /// [`crate::protocol::RouterBuilder::accept`]. pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static { /// Optional interception point to handle the `Connecting` state. - fn on_connecting(&self, conn: Connecting) -> BoxFuture> { + fn on_connecting(&self, connecting: Connecting) -> BoxFuture> { Box::pin(async move { - let conn = conn.await?; + let conn = connecting.await?; Ok(conn) }) } @@ -124,7 +123,7 @@ pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static { /// Handle an incoming connection. /// /// This runs on a freshly spawned tokio task so this can be long-running. - fn accept(&self, conn: Connection) -> BoxFuture>; + fn accept(&self, connection: Connection) -> BoxFuture>; /// Called when the node shuts down. fn shutdown(&self) -> BoxFuture<()> { @@ -136,6 +135,7 @@ impl ProtocolHandler for Arc { fn on_connecting(&self, conn: Connecting) -> BoxFuture> { self.as_ref().on_connecting(conn) } + fn accept(&self, conn: Connection) -> BoxFuture> { self.as_ref().accept(conn) } @@ -149,6 +149,7 @@ impl ProtocolHandler for Box { fn on_connecting(&self, conn: Connecting) -> BoxFuture> { self.as_ref().on_connecting(conn) } + fn accept(&self, conn: Connection) -> BoxFuture> { self.as_ref().accept(conn) } From 2ac95c9335850e619c7eb75aadea3c20c7bacde7 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 10 Feb 2025 17:32:16 +0100 Subject: [PATCH 3/4] improve docs --- iroh/src/protocol.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/iroh/src/protocol.rs b/iroh/src/protocol.rs index 19b1923975..01c7f97b93 100644 --- a/iroh/src/protocol.rs +++ b/iroh/src/protocol.rs @@ -381,7 +381,8 @@ async fn handle_connection(incoming: crate::endpoint::Incoming, protocols: Arc

{ proto: P, @@ -391,6 +392,9 @@ pub struct AccessLimit { impl AccessLimit

{ /// Create a new `AccessLimit`. + /// + /// The function should return `true` for nodes that are allowed to + /// connect, and `false` otherwise. pub fn new(proto: P, limiter: F) -> Self where F: Fn(NodeId) -> bool + Send + Sync + 'static, @@ -408,12 +412,10 @@ impl ProtocolHandler for AccessLimit

{ } fn accept(&self, conn: Connection) -> BoxFuture> { - println!("accepting limiter"); let this = self.clone(); Box::pin(async move { let remote = conn.remote_node_id()?; let is_allowed = (this.limiter)(remote); - dbg!(is_allowed); if !is_allowed { conn.close(0u32.into(), b"not allowed"); anyhow::bail!("not allowed"); From 8c927d8be44bf5a449e483bd87692dd000048996 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 11 Mar 2025 14:01:26 +0100 Subject: [PATCH 4/4] add more doc comments --- iroh/src/protocol.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/iroh/src/protocol.rs b/iroh/src/protocol.rs index 01c7f97b93..d386f4d5ec 100644 --- a/iroh/src/protocol.rs +++ b/iroh/src/protocol.rs @@ -113,6 +113,8 @@ pub struct RouterBuilder { /// [`crate::protocol::RouterBuilder::accept`]. pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static { /// Optional interception point to handle the `Connecting` state. + /// + /// This enables accepting 0-RTT data from clients, among other things. fn on_connecting(&self, connecting: Connecting) -> BoxFuture> { Box::pin(async move { let conn = connecting.await?; @@ -383,6 +385,8 @@ async fn handle_connection(incoming: crate::endpoint::Incoming, protocols: Arc

{ proto: P,