Skip to content

Commit 08f1fe0

Browse files
authored
refactor(iroh): move code from builder to node and make things nicer (#2386)
## Description This is a cleanup of the node and builder code. * Move `run` and `gc_loop` from `builder.rs` to `node.rs` - it is not about building but about running. * Improve the code flow and naming all around the builder, spawn and run functions * Make sure we shutdown the node correctly while spawning ## Breaking Changes * Removed `Node::controller`. Use `Node::client` instead. The former was mostly unusable anyway because we made the RPC structs private. ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [ ] ~~Tests if relevant.~~ - [x] All breaking changes documented.
1 parent 13ded84 commit 08f1fe0

File tree

3 files changed

+612
-613
lines changed

3 files changed

+612
-613
lines changed

iroh/src/node.rs

Lines changed: 259 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,26 @@
33
//! A node is a server that serves various protocols.
44
//!
55
//! To shut down the node, call [`Node::shutdown`].
6-
use std::fmt::Debug;
7-
use std::net::SocketAddr;
86
use std::path::Path;
97
use std::sync::Arc;
8+
use std::{collections::BTreeSet, net::SocketAddr};
9+
use std::{fmt::Debug, time::Duration};
1010

1111
use anyhow::{anyhow, Result};
1212
use futures_lite::StreamExt;
1313
use iroh_base::key::PublicKey;
14-
use iroh_blobs::downloader::Downloader;
15-
use iroh_blobs::store::Store as BaoStore;
14+
use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore};
15+
use iroh_blobs::{downloader::Downloader, protocol::Closed};
1616
use iroh_docs::engine::Engine;
1717
use iroh_gossip::net::Gossip;
1818
use iroh_net::key::SecretKey;
1919
use iroh_net::Endpoint;
2020
use iroh_net::{endpoint::DirectAddrsStream, util::SharedAbortingJoinHandle};
21-
use quic_rpc::transport::flume::FlumeConnection;
22-
use quic_rpc::RpcClient;
21+
use quic_rpc::{RpcServer, ServiceEndpoint};
22+
use tokio::task::JoinSet;
2323
use tokio_util::sync::CancellationToken;
2424
use tokio_util::task::LocalPoolHandle;
25-
use tracing::debug;
25+
use tracing::{debug, error, info, warn};
2626

2727
use crate::{client::RpcService, node::protocol::ProtocolMap};
2828

@@ -48,20 +48,19 @@ pub use protocol::ProtocolHandler;
4848
#[derive(Debug, Clone)]
4949
pub struct Node<D> {
5050
inner: Arc<NodeInner<D>>,
51-
client: crate::client::MemIroh,
5251
task: SharedAbortingJoinHandle<()>,
5352
protocols: Arc<ProtocolMap>,
5453
}
5554

5655
#[derive(derive_more::Debug)]
5756
struct NodeInner<D> {
5857
db: D,
59-
sync: DocsEngine,
58+
docs: DocsEngine,
6059
endpoint: Endpoint,
6160
gossip: Gossip,
6261
secret_key: SecretKey,
6362
cancel_token: CancellationToken,
64-
controller: FlumeConnection<RpcService>,
63+
client: crate::client::MemIroh,
6564
#[debug("rt")]
6665
rt: LocalPoolHandle,
6766
downloader: Downloader,
@@ -133,14 +132,9 @@ impl<D: BaoStore> Node<D> {
133132
self.inner.secret_key.public()
134133
}
135134

136-
/// Returns a handle that can be used to do RPC calls to the node internally.
137-
pub fn controller(&self) -> crate::client::MemRpcClient {
138-
RpcClient::new(self.inner.controller.clone())
139-
}
140-
141135
/// Return a client to control this node over an in-memory channel.
142136
pub fn client(&self) -> &crate::client::MemIroh {
143-
&self.client
137+
&self.inner.client
144138
}
145139

146140
/// Returns a referenc to the used `LocalPoolHandle`.
@@ -189,11 +183,11 @@ impl<D> std::ops::Deref for Node<D> {
189183
type Target = crate::client::MemIroh;
190184

191185
fn deref(&self) -> &Self::Target {
192-
&self.client
186+
&self.inner.client
193187
}
194188
}
195189

196-
impl<D> NodeInner<D> {
190+
impl<D: iroh_blobs::store::Store> NodeInner<D> {
197191
async fn local_endpoint_addresses(&self) -> Result<Vec<SocketAddr>> {
198192
let endpoints = self
199193
.endpoint
@@ -203,6 +197,243 @@ impl<D> NodeInner<D> {
203197
.ok_or(anyhow!("no endpoints found"))?;
204198
Ok(endpoints.into_iter().map(|x| x.addr).collect())
205199
}
200+
201+
async fn run(
202+
self: Arc<Self>,
203+
external_rpc: impl ServiceEndpoint<RpcService>,
204+
internal_rpc: impl ServiceEndpoint<RpcService>,
205+
protocols: Arc<ProtocolMap>,
206+
gc_policy: GcPolicy,
207+
gc_done_callback: Option<Box<dyn Fn() + Send>>,
208+
) {
209+
let (ipv4, ipv6) = self.endpoint.bound_sockets();
210+
debug!(
211+
"listening at: {}{}",
212+
ipv4,
213+
ipv6.map(|addr| format!(" and {addr}")).unwrap_or_default()
214+
);
215+
debug!("rpc listening at: {:?}", external_rpc.local_addr());
216+
217+
let mut join_set = JoinSet::new();
218+
219+
// Setup the RPC servers.
220+
let external_rpc = RpcServer::new(external_rpc);
221+
let internal_rpc = RpcServer::new(internal_rpc);
222+
223+
// TODO(frando): I think this is not needed as we do the same in a task just below.
224+
// forward the initial endpoints to the gossip protocol.
225+
// it may happen the the first endpoint update callback is missed because the gossip cell
226+
// is only initialized once the endpoint is fully bound
227+
if let Some(direct_addresses) = self.endpoint.direct_addresses().next().await {
228+
debug!(me = ?self.endpoint.node_id(), "gossip initial update: {direct_addresses:?}");
229+
self.gossip.update_direct_addresses(&direct_addresses).ok();
230+
}
231+
232+
// Spawn a task for the garbage collection.
233+
if let GcPolicy::Interval(gc_period) = gc_policy {
234+
let inner = self.clone();
235+
let handle = self
236+
.rt
237+
.spawn_pinned(move || inner.run_gc_loop(gc_period, gc_done_callback));
238+
// We cannot spawn tasks that run on the local pool directly into the join set,
239+
// so instead we create a new task that supervises the local task.
240+
join_set.spawn({
241+
async move {
242+
if let Err(err) = handle.await {
243+
return Err(anyhow::Error::from(err));
244+
}
245+
Ok(())
246+
}
247+
});
248+
}
249+
250+
// Spawn a task that updates the gossip endpoints.
251+
let inner = self.clone();
252+
join_set.spawn(async move {
253+
let mut stream = inner.endpoint.direct_addresses();
254+
while let Some(eps) = stream.next().await {
255+
if let Err(err) = inner.gossip.update_direct_addresses(&eps) {
256+
warn!("Failed to update direct addresses for gossip: {err:?}");
257+
}
258+
}
259+
warn!("failed to retrieve local endpoints");
260+
Ok(())
261+
});
262+
263+
loop {
264+
tokio::select! {
265+
biased;
266+
_ = self.cancel_token.cancelled() => {
267+
break;
268+
},
269+
// handle rpc requests. This will do nothing if rpc is not configured, since
270+
// accept is just a pending future.
271+
request = external_rpc.accept() => {
272+
match request {
273+
Ok((msg, chan)) => {
274+
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
275+
}
276+
Err(e) => {
277+
info!("rpc request error: {:?}", e);
278+
}
279+
}
280+
},
281+
// handle internal rpc requests.
282+
request = internal_rpc.accept() => {
283+
match request {
284+
Ok((msg, chan)) => {
285+
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
286+
}
287+
Err(e) => {
288+
info!("internal rpc request error: {:?}", e);
289+
}
290+
}
291+
},
292+
// handle incoming p2p connections.
293+
Some(connecting) = self.endpoint.accept() => {
294+
let protocols = protocols.clone();
295+
join_set.spawn(async move {
296+
handle_connection(connecting, protocols).await;
297+
Ok(())
298+
});
299+
},
300+
// handle task terminations and quit on panics.
301+
res = join_set.join_next(), if !join_set.is_empty() => {
302+
if let Some(Err(err)) = res {
303+
error!("Task failed: {err:?}");
304+
break;
305+
}
306+
},
307+
else => break,
308+
}
309+
}
310+
311+
self.shutdown(protocols).await;
312+
313+
// Abort remaining tasks.
314+
join_set.shutdown().await;
315+
}
316+
317+
async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
318+
// Shutdown the different parts of the node concurrently.
319+
let error_code = Closed::ProviderTerminating;
320+
// We ignore all errors during shutdown.
321+
let _ = tokio::join!(
322+
// Close the endpoint.
323+
// Closing the Endpoint is the equivalent of calling Connection::close on all
324+
// connections: Operations will immediately fail with ConnectionError::LocallyClosed.
325+
// All streams are interrupted, this is not graceful.
326+
self.endpoint
327+
.clone()
328+
.close(error_code.into(), error_code.reason()),
329+
// Shutdown sync engine.
330+
self.docs.shutdown(),
331+
// Shutdown blobs store engine.
332+
self.db.shutdown(),
333+
// Shutdown protocol handlers.
334+
protocols.shutdown(),
335+
);
336+
}
337+
338+
async fn run_gc_loop(
339+
self: Arc<Self>,
340+
gc_period: Duration,
341+
done_cb: Option<Box<dyn Fn() + Send>>,
342+
) {
343+
tracing::info!("Starting GC task with interval {:?}", gc_period);
344+
let db = &self.db;
345+
let docs = &self.docs;
346+
let mut live = BTreeSet::new();
347+
'outer: loop {
348+
if let Err(cause) = db.gc_start().await {
349+
tracing::debug!(
350+
"unable to notify the db of GC start: {cause}. Shutting down GC loop."
351+
);
352+
break;
353+
}
354+
// do delay before the two phases of GC
355+
tokio::time::sleep(gc_period).await;
356+
tracing::debug!("Starting GC");
357+
live.clear();
358+
359+
let doc_hashes = match docs.sync.content_hashes().await {
360+
Ok(hashes) => hashes,
361+
Err(err) => {
362+
tracing::warn!("Error getting doc hashes: {}", err);
363+
continue 'outer;
364+
}
365+
};
366+
for hash in doc_hashes {
367+
match hash {
368+
Ok(hash) => {
369+
live.insert(hash);
370+
}
371+
Err(err) => {
372+
tracing::error!("Error getting doc hash: {}", err);
373+
continue 'outer;
374+
}
375+
}
376+
}
377+
378+
tracing::debug!("Starting GC mark phase");
379+
let mut stream = db.gc_mark(&mut live);
380+
while let Some(item) = stream.next().await {
381+
match item {
382+
GcMarkEvent::CustomDebug(text) => {
383+
tracing::debug!("{}", text);
384+
}
385+
GcMarkEvent::CustomWarning(text, _) => {
386+
tracing::warn!("{}", text);
387+
}
388+
GcMarkEvent::Error(err) => {
389+
tracing::error!("Fatal error during GC mark {}", err);
390+
continue 'outer;
391+
}
392+
}
393+
}
394+
drop(stream);
395+
396+
tracing::debug!("Starting GC sweep phase");
397+
let mut stream = db.gc_sweep(&live);
398+
while let Some(item) = stream.next().await {
399+
match item {
400+
GcSweepEvent::CustomDebug(text) => {
401+
tracing::debug!("{}", text);
402+
}
403+
GcSweepEvent::CustomWarning(text, _) => {
404+
tracing::warn!("{}", text);
405+
}
406+
GcSweepEvent::Error(err) => {
407+
tracing::error!("Fatal error during GC mark {}", err);
408+
continue 'outer;
409+
}
410+
}
411+
}
412+
if let Some(ref cb) = done_cb {
413+
cb();
414+
}
415+
}
416+
}
417+
}
418+
419+
async fn handle_connection(
420+
mut connecting: iroh_net::endpoint::Connecting,
421+
protocols: Arc<ProtocolMap>,
422+
) {
423+
let alpn = match connecting.alpn().await {
424+
Ok(alpn) => alpn,
425+
Err(err) => {
426+
warn!("Ignoring connection: invalid handshake: {:?}", err);
427+
return;
428+
}
429+
};
430+
let Some(handler) = protocols.get(&alpn) else {
431+
warn!("Ignoring connection: unsupported ALPN protocol");
432+
return;
433+
};
434+
if let Err(err) = handler.accept(connecting).await {
435+
warn!("Handling incoming connection ended with error: {err}");
436+
}
206437
}
207438

208439
/// Wrapper around [`Engine`] so that we can implement our RPC methods directly.
@@ -228,7 +459,7 @@ mod tests {
228459

229460
use crate::{
230461
client::blobs::{AddOutcome, WrapOption},
231-
rpc_protocol::{BlobAddPathRequest, BlobAddPathResponse, SetTagOption},
462+
rpc_protocol::SetTagOption,
232463
};
233464

234465
use super::*;
@@ -289,18 +520,17 @@ mod tests {
289520

290521
let _got_hash = tokio::time::timeout(Duration::from_secs(1), async move {
291522
let mut stream = node
292-
.controller()
293-
.server_streaming(BlobAddPathRequest {
294-
path: Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md"),
295-
in_place: false,
296-
tag: SetTagOption::Auto,
297-
wrap: WrapOption::NoWrap,
298-
})
523+
.blobs()
524+
.add_from_path(
525+
Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md"),
526+
false,
527+
SetTagOption::Auto,
528+
WrapOption::NoWrap,
529+
)
299530
.await?;
300531

301-
while let Some(item) = stream.next().await {
302-
let BlobAddPathResponse(progress) = item?;
303-
match progress {
532+
while let Some(progress) = stream.next().await {
533+
match progress? {
304534
AddProgress::AllDone { hash, .. } => {
305535
return Ok(hash);
306536
}

0 commit comments

Comments
 (0)