Skip to content

feat(iroh)!: allow for limiting incoming connections on the router #3157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ let router = Router::builder(endpoint)
struct Echo;

impl ProtocolHandler for Echo {
fn accept(self: Arc<Self>, connecting: Connecting) -> BoxedFuture<Result<()>> {
fn accept(&self, connection: Connection) -> BoxedFuture<Result<()>> {
Box::pin(async move {
let connection = connecting.await?;
let (mut send, mut recv) = connection.accept_bi().await?;

// Echo any bytes received back directly.
Expand Down
6 changes: 2 additions & 4 deletions iroh/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use anyhow::Result;
use iroh::{
endpoint::Connecting,
endpoint::Connection,
protocol::{ProtocolHandler, Router},
Endpoint, NodeAddr,
};
Expand Down Expand Up @@ -73,11 +73,9 @@ impl ProtocolHandler for Echo {
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts.
fn accept(&self, connecting: Connecting) -> BoxFuture<Result<()>> {
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
// We have to return a boxed future from the handler.
Box::pin(async move {
// Wait for the connection to be fully established.
let connection = connecting.await?;
// We can get the remote's node id from the connection.
let node_id = connection.remote_node_id()?;
println!("accepted connection from {node_id}");
Expand Down
6 changes: 2 additions & 4 deletions iroh/examples/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::{collections::BTreeSet, sync::Arc};
use anyhow::Result;
use clap::Parser;
use iroh::{
endpoint::Connecting,
endpoint::Connection,
protocol::{ProtocolHandler, Router},
Endpoint, NodeId,
};
Expand Down Expand Up @@ -127,12 +127,10 @@ impl ProtocolHandler for BlobSearch {
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts.
fn accept(&self, connecting: Connecting) -> BoxFuture<Result<()>> {
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
let this = self.clone();
// We have to return a boxed future from the handler.
Box::pin(async move {
// Wait for the connection to be fully established.
let connection = connecting.await?;
// We can get the remote's node id from the connection.
let node_id = connection.remote_node_id()?;
println!("accepted connection from {node_id}");
Expand Down
142 changes: 133 additions & 9 deletions iroh/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! ```no_run
//! # use anyhow::Result;
//! # use futures_lite::future::Boxed as BoxedFuture;
//! # use iroh::{endpoint::Connecting, protocol::{ProtocolHandler, Router}, Endpoint, NodeAddr};
//! # use iroh::{endpoint::Connection, protocol::{ProtocolHandler, Router}, Endpoint, NodeAddr};
//! #
//! # async fn test_compile() -> Result<()> {
//! let endpoint = Endpoint::builder().discovery_n0().bind().await?;
Expand All @@ -22,9 +22,8 @@
//! struct Echo;
//!
//! impl ProtocolHandler for Echo {
//! fn accept(&self, connecting: Connecting) -> BoxedFuture<Result<()>> {
//! fn accept(&self, connection: Connection) -> BoxedFuture<Result<()>> {
//! Box::pin(async move {
//! let connection = connecting.await?;
//! let (mut send, mut recv) = connection.accept_bi().await?;
//!
//! // Echo any bytes received back directly.
Expand All @@ -41,6 +40,7 @@
use std::{collections::BTreeMap, sync::Arc};

use anyhow::Result;
use iroh_base::NodeId;
use n0_future::{
boxed::BoxFuture,
join_all,
Expand All @@ -50,7 +50,10 @@ use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::{error, info_span, trace, warn, Instrument};

use crate::{endpoint::Connecting, Endpoint};
use crate::{
endpoint::{Connecting, Connection},
Endpoint,
};

/// The built router.
///
Expand Down Expand Up @@ -109,10 +112,18 @@ pub struct RouterBuilder {
/// The protocol handler must then be registered on the node for an ALPN protocol with
/// [`crate::protocol::RouterBuilder::accept`].
pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static {
/// Optional interception point to handle the `Connecting` state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always a fan of being extremely verbose on the docs. I would add a paragraph suggesting when you might to use this:

This enables accepting 0-RTT data from clients, among other things.

fn on_connecting(&self, connecting: Connecting) -> BoxFuture<Result<Connection>> {
Box::pin(async move {
let conn = connecting.await?;
Ok(conn)
})
}

/// Handle an incoming connection.
///
/// This runs on a freshly spawned tokio task so this can be long-running.
fn accept(&self, conn: Connecting) -> BoxFuture<Result<()>>;
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>>;

/// Called when the node shuts down.
fn shutdown(&self) -> BoxFuture<()> {
Expand All @@ -121,7 +132,11 @@ pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static {
}

impl<T: ProtocolHandler> ProtocolHandler for Arc<T> {
fn accept(&self, conn: Connecting) -> BoxFuture<Result<()>> {
fn on_connecting(&self, conn: Connecting) -> BoxFuture<Result<Connection>> {
self.as_ref().on_connecting(conn)
}

fn accept(&self, conn: Connection) -> BoxFuture<Result<()>> {
self.as_ref().accept(conn)
}

Expand All @@ -131,7 +146,11 @@ impl<T: ProtocolHandler> ProtocolHandler for Arc<T> {
}

impl<T: ProtocolHandler> ProtocolHandler for Box<T> {
fn accept(&self, conn: Connecting) -> BoxFuture<Result<()>> {
fn on_connecting(&self, conn: Connecting) -> BoxFuture<Result<Connection>> {
self.as_ref().on_connecting(conn)
}

fn accept(&self, conn: Connection) -> BoxFuture<Result<()>> {
self.as_ref().accept(conn)
}

Expand Down Expand Up @@ -350,8 +369,64 @@ async fn handle_connection(incoming: crate::endpoint::Incoming, protocols: Arc<P
warn!("Ignoring connection: unsupported ALPN protocol");
return;
};
if let Err(err) = handler.accept(connecting).await {
warn!("Handling incoming connection ended with error: {err}");
match handler.on_connecting(connecting).await {
Ok(connection) => {
if let Err(err) = handler.accept(connection).await {
warn!("Handling incoming connection ended with error: {err}");
}
}
Err(err) => {
warn!("Handling incoming connecting ended with error: {err}");
}
}
}

/// Wraps an existing protocol, limiting its access,
/// based on the provided function.
#[derive(derive_more::Debug, Clone)]
pub struct AccessLimit<P: ProtocolHandler + Clone> {
proto: P,
#[debug("limiter")]
limiter: Arc<dyn Fn(NodeId) -> bool + Send + Sync + 'static>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this was discussed before: will we get folks upset that they can't do async stuff here? I think this itself runs in an async context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course we/they can just make an AsyncAcessLimit which is probably nicer than forcing everyone to use async.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flub @dignifiedquire

How would you feel if I created a PR for AsyncAccessLimit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, why not. Don't think there'd be an objection to this.

}

impl<P: ProtocolHandler + Clone> AccessLimit<P> {
/// Create a new `AccessLimit`.
///
/// The function should return `true` for nodes that are allowed to
/// connect, and `false` otherwise.
pub fn new<F>(proto: P, limiter: F) -> Self
where
F: Fn(NodeId) -> bool + Send + Sync + 'static,
{
Self {
proto,
limiter: Arc::new(limiter),
}
}
}

impl<P: ProtocolHandler + Clone> ProtocolHandler for AccessLimit<P> {
fn on_connecting(&self, conn: Connecting) -> BoxFuture<Result<Connection>> {
self.proto.on_connecting(conn)
}

fn accept(&self, conn: Connection) -> BoxFuture<Result<()>> {
let this = self.clone();
Box::pin(async move {
let remote = conn.remote_node_id()?;
let is_allowed = (this.limiter)(remote);
if !is_allowed {
conn.close(0u32.into(), b"not allowed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be documented. Probably on the struct itself. Something like:

Any refused connection will be closed with an error code of `0` and reason `not allowed`.

(I can also imagine folks wanting this configurable. But I think that's possible in a backwards-compatible way so should be fine to add later. I can also imagine a world where the function returns those values.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course we/they can just make an AsyncAcessLimit which is probably nicer than forcing everyone to use async.

yeah, this is more of an example for me

anyhow::bail!("not allowed");
}
this.proto.accept(conn).await?;
Ok(())
})
}

fn shutdown(&self) -> BoxFuture<()> {
self.proto.shutdown()
}
}

Expand All @@ -374,4 +449,53 @@ mod tests {

Ok(())
}

// The protocol definition:
#[derive(Debug, Clone)]
struct Echo;

const ECHO_ALPN: &[u8] = b"/iroh/echo/1";

impl ProtocolHandler for Echo {
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
println!("accepting echo");
Box::pin(async move {
let (mut send, mut recv) = connection.accept_bi().await?;

// Echo any bytes received back directly.
let _bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;

send.finish()?;
connection.closed().await;

Ok(())
})
}
}
#[tokio::test]
async fn test_limiter() -> Result<()> {
let e1 = Endpoint::builder().bind().await?;
// deny all access
let proto = AccessLimit::new(Echo, |_node_id| false);
let r1 = Router::builder(e1.clone())
.accept(ECHO_ALPN, proto)
.spawn()
.await?;

let addr1 = r1.endpoint().node_addr().await?;

let e2 = Endpoint::builder().bind().await?;

println!("connecting");
let conn = e2.connect(addr1, ECHO_ALPN).await?;

let (_send, mut recv) = conn.open_bi().await?;
let response = recv.read_to_end(1000).await.unwrap_err();
assert!(format!("{:#?}", response).contains("not allowed"));

r1.shutdown().await?;
e2.close().await;

Ok(())
}
}
Loading