-
Notifications
You must be signed in to change notification settings - Fork 236
feat(iroh)!: allow for limiting incoming connections on the router #3157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
54e6b66
97239a6
2ac95c9
e60180b
8c927d8
da7fb55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
//! ```no_run | ||
//! # use anyhow::Result; | ||
//! # use futures_lite::future::Boxed as BoxedFuture; | ||
//! # use iroh::{endpoint::Connecting, protocol::{ProtocolHandler, Router}, Endpoint, NodeAddr}; | ||
//! # use iroh::{endpoint::Connection, protocol::{ProtocolHandler, Router}, Endpoint, NodeAddr}; | ||
//! # | ||
//! # async fn test_compile() -> Result<()> { | ||
//! let endpoint = Endpoint::builder().discovery_n0().bind().await?; | ||
|
@@ -22,9 +22,8 @@ | |
//! struct Echo; | ||
//! | ||
//! impl ProtocolHandler for Echo { | ||
//! fn accept(&self, connecting: Connecting) -> BoxedFuture<Result<()>> { | ||
//! fn accept(&self, connection: Connection) -> BoxedFuture<Result<()>> { | ||
//! Box::pin(async move { | ||
//! let connection = connecting.await?; | ||
//! let (mut send, mut recv) = connection.accept_bi().await?; | ||
//! | ||
//! // Echo any bytes received back directly. | ||
|
@@ -41,6 +40,7 @@ | |
use std::{collections::BTreeMap, sync::Arc}; | ||
|
||
use anyhow::Result; | ||
use iroh_base::NodeId; | ||
use n0_future::{ | ||
boxed::BoxFuture, | ||
join_all, | ||
|
@@ -50,7 +50,10 @@ use tokio::sync::Mutex; | |
use tokio_util::sync::CancellationToken; | ||
use tracing::{error, info_span, trace, warn, Instrument}; | ||
|
||
use crate::{endpoint::Connecting, Endpoint}; | ||
use crate::{ | ||
endpoint::{Connecting, Connection}, | ||
Endpoint, | ||
}; | ||
|
||
/// The built router. | ||
/// | ||
|
@@ -109,10 +112,18 @@ pub struct RouterBuilder { | |
/// The protocol handler must then be registered on the node for an ALPN protocol with | ||
/// [`crate::protocol::RouterBuilder::accept`]. | ||
pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static { | ||
/// Optional interception point to handle the `Connecting` state. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm always a fan of being extremely verbose on the docs. I would add a paragraph suggesting when you might to use this:
|
||
fn on_connecting(&self, connecting: Connecting) -> BoxFuture<Result<Connection>> { | ||
Box::pin(async move { | ||
let conn = connecting.await?; | ||
Ok(conn) | ||
}) | ||
} | ||
|
||
/// Handle an incoming connection. | ||
/// | ||
/// This runs on a freshly spawned tokio task so this can be long-running. | ||
fn accept(&self, conn: Connecting) -> BoxFuture<Result<()>>; | ||
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>>; | ||
|
||
/// Called when the node shuts down. | ||
fn shutdown(&self) -> BoxFuture<()> { | ||
|
@@ -121,7 +132,11 @@ pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static { | |
} | ||
|
||
impl<T: ProtocolHandler> ProtocolHandler for Arc<T> { | ||
fn accept(&self, conn: Connecting) -> BoxFuture<Result<()>> { | ||
fn on_connecting(&self, conn: Connecting) -> BoxFuture<Result<Connection>> { | ||
self.as_ref().on_connecting(conn) | ||
} | ||
|
||
fn accept(&self, conn: Connection) -> BoxFuture<Result<()>> { | ||
self.as_ref().accept(conn) | ||
} | ||
|
||
|
@@ -131,7 +146,11 @@ impl<T: ProtocolHandler> ProtocolHandler for Arc<T> { | |
} | ||
|
||
impl<T: ProtocolHandler> ProtocolHandler for Box<T> { | ||
fn accept(&self, conn: Connecting) -> BoxFuture<Result<()>> { | ||
fn on_connecting(&self, conn: Connecting) -> BoxFuture<Result<Connection>> { | ||
self.as_ref().on_connecting(conn) | ||
} | ||
|
||
fn accept(&self, conn: Connection) -> BoxFuture<Result<()>> { | ||
self.as_ref().accept(conn) | ||
} | ||
|
||
|
@@ -350,8 +369,64 @@ async fn handle_connection(incoming: crate::endpoint::Incoming, protocols: Arc<P | |
warn!("Ignoring connection: unsupported ALPN protocol"); | ||
return; | ||
}; | ||
if let Err(err) = handler.accept(connecting).await { | ||
warn!("Handling incoming connection ended with error: {err}"); | ||
match handler.on_connecting(connecting).await { | ||
Ok(connection) => { | ||
if let Err(err) = handler.accept(connection).await { | ||
warn!("Handling incoming connection ended with error: {err}"); | ||
} | ||
} | ||
Err(err) => { | ||
warn!("Handling incoming connecting ended with error: {err}"); | ||
} | ||
} | ||
} | ||
|
||
/// Wraps an existing protocol, limiting its access, | ||
/// based on the provided function. | ||
#[derive(derive_more::Debug, Clone)] | ||
pub struct AccessLimit<P: ProtocolHandler + Clone> { | ||
proto: P, | ||
#[debug("limiter")] | ||
limiter: Arc<dyn Fn(NodeId) -> bool + Send + Sync + 'static>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if this was discussed before: will we get folks upset that they can't do async stuff here? I think this itself runs in an async context. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Of course we/they can just make an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How would you feel if I created a PR for AsyncAccessLimit? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, why not. Don't think there'd be an objection to this. |
||
} | ||
|
||
impl<P: ProtocolHandler + Clone> AccessLimit<P> { | ||
/// Create a new `AccessLimit`. | ||
/// | ||
/// The function should return `true` for nodes that are allowed to | ||
/// connect, and `false` otherwise. | ||
pub fn new<F>(proto: P, limiter: F) -> Self | ||
where | ||
F: Fn(NodeId) -> bool + Send + Sync + 'static, | ||
{ | ||
Self { | ||
proto, | ||
limiter: Arc::new(limiter), | ||
} | ||
} | ||
} | ||
|
||
impl<P: ProtocolHandler + Clone> ProtocolHandler for AccessLimit<P> { | ||
fn on_connecting(&self, conn: Connecting) -> BoxFuture<Result<Connection>> { | ||
self.proto.on_connecting(conn) | ||
} | ||
|
||
fn accept(&self, conn: Connection) -> BoxFuture<Result<()>> { | ||
let this = self.clone(); | ||
Box::pin(async move { | ||
let remote = conn.remote_node_id()?; | ||
let is_allowed = (this.limiter)(remote); | ||
if !is_allowed { | ||
conn.close(0u32.into(), b"not allowed"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be documented. Probably on the struct itself. Something like:
(I can also imagine folks wanting this configurable. But I think that's possible in a backwards-compatible way so should be fine to add later. I can also imagine a world where the function returns those values.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yeah, this is more of an example for me |
||
anyhow::bail!("not allowed"); | ||
} | ||
this.proto.accept(conn).await?; | ||
Ok(()) | ||
}) | ||
} | ||
|
||
fn shutdown(&self) -> BoxFuture<()> { | ||
self.proto.shutdown() | ||
} | ||
} | ||
|
||
|
@@ -374,4 +449,53 @@ mod tests { | |
|
||
Ok(()) | ||
} | ||
|
||
// The protocol definition: | ||
#[derive(Debug, Clone)] | ||
struct Echo; | ||
|
||
const ECHO_ALPN: &[u8] = b"/iroh/echo/1"; | ||
|
||
impl ProtocolHandler for Echo { | ||
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> { | ||
println!("accepting echo"); | ||
Box::pin(async move { | ||
let (mut send, mut recv) = connection.accept_bi().await?; | ||
|
||
// Echo any bytes received back directly. | ||
let _bytes_sent = tokio::io::copy(&mut recv, &mut send).await?; | ||
|
||
send.finish()?; | ||
connection.closed().await; | ||
|
||
Ok(()) | ||
}) | ||
} | ||
} | ||
#[tokio::test] | ||
async fn test_limiter() -> Result<()> { | ||
let e1 = Endpoint::builder().bind().await?; | ||
// deny all access | ||
let proto = AccessLimit::new(Echo, |_node_id| false); | ||
let r1 = Router::builder(e1.clone()) | ||
.accept(ECHO_ALPN, proto) | ||
.spawn() | ||
.await?; | ||
|
||
let addr1 = r1.endpoint().node_addr().await?; | ||
|
||
let e2 = Endpoint::builder().bind().await?; | ||
|
||
println!("connecting"); | ||
let conn = e2.connect(addr1, ECHO_ALPN).await?; | ||
|
||
let (_send, mut recv) = conn.open_bi().await?; | ||
let response = recv.read_to_end(1000).await.unwrap_err(); | ||
assert!(format!("{:#?}", response).contains("not allowed")); | ||
|
||
r1.shutdown().await?; | ||
e2.close().await; | ||
|
||
Ok(()) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.