diff --git a/Cargo.lock b/Cargo.lock index f591bb74ad..7eb4da3ff3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2478,6 +2478,7 @@ name = "iroh" version = "0.21.0" dependencies = [ "anyhow", + "async-channel", "bao-tree", "bytes", "clap", @@ -2732,10 +2733,10 @@ name = "iroh-docs" version = "0.21.0" dependencies = [ "anyhow", + "async-channel", "bytes", "derive_more", "ed25519-dalek", - "flume", "futures-buffered", "futures-lite 2.3.0", "futures-util", diff --git a/iroh-docs/Cargo.toml b/iroh-docs/Cargo.toml index 60a3fdb494..180898d423 100644 --- a/iroh-docs/Cargo.toml +++ b/iroh-docs/Cargo.toml @@ -16,11 +16,11 @@ workspace = true [dependencies] anyhow = "1" +async-channel = "2.3.1" blake3 = { package = "iroh-blake3", version = "1.4.5"} bytes = { version = "1.4", features = ["serde"] } derive_more = { version = "1.0.0-beta.6", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref"] } ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] } -flume = "0.11" futures-buffered = "0.2.4" futures-lite = "2.3.0" futures-util = { version = "0.3.25" } diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index 769f14a482..79bd34ad28 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -61,12 +61,12 @@ enum Action { #[display("ListAuthors")] ListAuthors { #[debug("reply")] - reply: flume::Sender>, + reply: async_channel::Sender>, }, #[display("ListReplicas")] ListReplicas { #[debug("reply")] - reply: flume::Sender>, + reply: async_channel::Sender>, }, #[display("ContentHashes")] ContentHashes { @@ -108,12 +108,12 @@ enum ReplicaAction { reply: oneshot::Sender>, }, Subscribe { - sender: flume::Sender, + sender: async_channel::Sender, #[debug("reply")] reply: oneshot::Sender>, }, Unsubscribe { - sender: flume::Sender, + sender: async_channel::Sender, #[debug("reply")] reply: oneshot::Sender>, }, @@ -166,7 +166,7 @@ enum ReplicaAction { }, GetMany { query: Query, - reply: flume::Sender>, + reply: async_channel::Sender>, }, DropReplica { reply: oneshot::Sender>, @@ -222,7 +222,7 @@ struct OpenReplica { /// [`SyncHandle::drop`] will not block. #[derive(Debug, Clone)] pub struct SyncHandle { - tx: flume::Sender, + tx: async_channel::Sender, join_handle: Arc>>, } @@ -232,7 +232,7 @@ pub struct OpenOpts { /// Set to true to set sync state to true. pub sync: bool, /// Optionally subscribe to replica events. - pub subscribe: Option>, + pub subscribe: Option>, } impl OpenOpts { /// Set sync state to true. @@ -241,7 +241,7 @@ impl OpenOpts { self } /// Subscribe to replica events. - pub fn subscribe(mut self, subscribe: flume::Sender) -> Self { + pub fn subscribe(mut self, subscribe: async_channel::Sender) -> Self { self.subscribe = Some(subscribe); self } @@ -255,7 +255,7 @@ impl SyncHandle { content_status_callback: Option, me: String, ) -> SyncHandle { - let (action_tx, action_rx) = flume::bounded(ACTION_CAP); + let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP); let actor = Actor { store, states: Default::default(), @@ -298,7 +298,7 @@ impl SyncHandle { pub async fn subscribe( &self, namespace: NamespaceId, - sender: flume::Sender, + sender: async_channel::Sender, ) -> Result<()> { let (reply, rx) = oneshot::channel(); self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply }) @@ -309,7 +309,7 @@ impl SyncHandle { pub async fn unsubscribe( &self, namespace: NamespaceId, - sender: flume::Sender, + sender: async_channel::Sender, ) -> Result<()> { let (reply, rx) = oneshot::channel(); self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply }) @@ -435,7 +435,7 @@ impl SyncHandle { &self, namespace: NamespaceId, query: Query, - reply: flume::Sender>, + reply: async_channel::Sender>, ) -> Result<()> { let action = ReplicaAction::GetMany { query, reply }; self.send_replica(namespace, action).await?; @@ -489,13 +489,13 @@ impl SyncHandle { Ok(store) } - pub async fn list_authors(&self, reply: flume::Sender>) -> Result<()> { + pub async fn list_authors(&self, reply: async_channel::Sender>) -> Result<()> { self.send(Action::ListAuthors { reply }).await } pub async fn list_replicas( &self, - reply: flume::Sender>, + reply: async_channel::Sender>, ) -> Result<()> { self.send(Action::ListReplicas { reply }).await } @@ -566,7 +566,7 @@ impl SyncHandle { async fn send(&self, action: Action) -> Result<()> { self.tx - .send_async(action) + .send(action) .await .context("sending to iroh_docs actor failed")?; Ok(()) @@ -581,7 +581,10 @@ impl Drop for SyncHandle { fn drop(&mut self) { // this means we're dropping the last reference if let Some(handle) = Arc::get_mut(&mut self.join_handle) { - self.tx.send(Action::Shutdown { reply: None }).ok(); + // this call is the reason tx can not be a tokio mpsc channel. + // we have no control about where drop is called, yet tokio send_blocking panics + // when called from inside a tokio runtime. + self.tx.send_blocking(Action::Shutdown { reply: None }).ok(); let handle = handle.take().expect("this can only run once"); if let Err(err) = handle.join() { warn!(?err, "Failed to join sync actor"); @@ -593,7 +596,7 @@ impl Drop for SyncHandle { struct Actor { store: Store, states: OpenReplicas, - action_rx: flume::Receiver, + action_rx: async_channel::Receiver, content_status_callback: Option, tasks: JoinSet<()>, } @@ -619,10 +622,10 @@ impl Actor { } continue; } - action = self.action_rx.recv_async() => { + action = self.action_rx.recv() => { match action { Ok(action) => action, - Err(flume::RecvError::Disconnected) => { + Err(async_channel::RecvError) => { debug!("action channel disconnected"); break None; } @@ -979,17 +982,14 @@ impl OpenReplicas { } async fn iter_to_channel_async( - channel: flume::Sender>, + channel: async_channel::Sender>, iter: Result>>, ) -> Result<(), SendReplyError> { match iter { - Err(err) => channel - .send_async(Err(err)) - .await - .map_err(send_reply_error)?, + Err(err) => channel.send(Err(err)).await.map_err(send_reply_error)?, Ok(iter) => { for item in iter { - channel.send_async(item).await.map_err(send_reply_error)?; + channel.send(item).await.map_err(send_reply_error)?; } } } @@ -1032,10 +1032,10 @@ mod tests { let id = namespace.id(); sync.import_namespace(namespace.into()).await?; sync.open(id, Default::default()).await?; - let (tx, rx) = flume::bounded(10); + let (tx, rx) = async_channel::bounded(10); sync.subscribe(id, tx).await?; sync.close(id).await?; - assert!(rx.recv_async().await.is_err()); + assert!(rx.recv().await.is_err()); Ok(()) } } diff --git a/iroh-docs/src/engine.rs b/iroh-docs/src/engine.rs index a408df2793..1beebd2730 100644 --- a/iroh-docs/src/engine.rs +++ b/iroh-docs/src/engine.rs @@ -170,15 +170,15 @@ impl Engine { // Subscribe to insert events from the replica. let a = { - let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP); + let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP); this.sync.subscribe(namespace, s).await?; - r.into_stream() - .map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb)) + Box::pin(r).map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb)) }; // Subscribe to events from the [`live::Actor`]. let b = { - let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP); + let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP); + let r = Box::pin(r); let (reply, reply_rx) = oneshot::channel(); this.to_live_actor .send(ToLiveActor::Subscribe { @@ -188,7 +188,7 @@ impl Engine { }) .await?; reply_rx.await??; - r.into_stream().map(|event| Ok(LiveEvent::from(event))) + r.map(|event| Ok(LiveEvent::from(event))) }; Ok(a.or(b)) diff --git a/iroh-docs/src/engine/live.rs b/iroh-docs/src/engine/live.rs index f41744ac73..6e49536baa 100644 --- a/iroh-docs/src/engine/live.rs +++ b/iroh-docs/src/engine/live.rs @@ -78,7 +78,7 @@ pub enum ToLiveActor { Subscribe { namespace: NamespaceId, #[debug("sender")] - sender: flume::Sender, + sender: async_channel::Sender, #[debug("oneshot::Sender")] reply: sync::oneshot::Sender>, }, @@ -153,8 +153,8 @@ pub struct LiveActor { gossip: Gossip, bao_store: B, downloader: Downloader, - replica_events_tx: flume::Sender, - replica_events_rx: flume::Receiver, + replica_events_tx: async_channel::Sender, + replica_events_rx: async_channel::Receiver, /// Send messages to self. /// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks. @@ -192,7 +192,7 @@ impl LiveActor { sync_actor_tx: mpsc::Sender, gossip_actor_tx: mpsc::Sender, ) -> Self { - let (replica_events_tx, replica_events_rx) = flume::bounded(1024); + let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024); Self { inbox, sync, @@ -262,7 +262,7 @@ impl LiveActor { } } } - event = self.replica_events_rx.recv_async() => { + event = self.replica_events_rx.recv() => { trace!(?i, "tick: replica_event"); inc!(Metrics, doc_live_tick_replica_event); let event = event.context("replica_events closed")?; @@ -865,7 +865,7 @@ impl From<&SyncFinished> for SyncDetails { struct SubscribersMap(HashMap); impl SubscribersMap { - fn subscribe(&mut self, namespace: NamespaceId, sender: flume::Sender) { + fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender) { self.0.entry(namespace).or_default().subscribe(sender); } @@ -930,15 +930,15 @@ impl QueuedHashes { } #[derive(Debug, Default)] -struct Subscribers(Vec>); +struct Subscribers(Vec>); impl Subscribers { - fn subscribe(&mut self, sender: flume::Sender) { + fn subscribe(&mut self, sender: async_channel::Sender) { self.0.push(sender) } async fn send(&mut self, event: Event) -> bool { - let futs = self.0.iter().map(|sender| sender.send_async(event.clone())); + let futs = self.0.iter().map(|sender| sender.send(event.clone())); let res = futures_buffered::join_all(futs).await; // reverse the order so removing does not shift remaining indices for (i, res) in res.into_iter().enumerate().rev() { @@ -977,8 +977,8 @@ mod tests { #[tokio::test] async fn test_sync_remove() { let pk = PublicKey::from_bytes(&[1; 32]).unwrap(); - let (a_tx, a_rx) = flume::unbounded(); - let (b_tx, b_rx) = flume::unbounded(); + let (a_tx, a_rx) = async_channel::unbounded(); + let (b_tx, b_rx) = async_channel::unbounded(); let mut subscribers = Subscribers::default(); subscribers.subscribe(a_tx); subscribers.subscribe(b_tx); diff --git a/iroh-docs/src/sync.rs b/iroh-docs/src/sync.rs index 5d3896f4bc..773f497e69 100644 --- a/iroh-docs/src/sync.rs +++ b/iroh-docs/src/sync.rs @@ -108,17 +108,34 @@ pub struct SyncOutcome { pub num_sent: usize, } +fn get_as_ptr(value: &T) -> Option { + use std::mem; + if mem::size_of::() == std::mem::size_of::() + && mem::align_of::() == mem::align_of::() + { + // Safe only if size and alignment requirements are met + unsafe { Some(mem::transmute_copy(value)) } + } else { + None + } +} + +fn same_channel(a: &async_channel::Sender, b: &async_channel::Sender) -> bool { + get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap() +} + #[derive(Debug, Default)] -struct Subscribers(Vec>); +struct Subscribers(Vec>); impl Subscribers { - pub fn subscribe(&mut self, sender: flume::Sender) { + pub fn subscribe(&mut self, sender: async_channel::Sender) { self.0.push(sender) } - pub fn unsubscribe(&mut self, sender: &flume::Sender) { - self.0.retain(|s| !s.same_channel(sender)); + pub fn unsubscribe(&mut self, sender: &async_channel::Sender) { + self.0.retain(|s| !same_channel(s, sender)); } pub fn send(&mut self, event: Event) { - self.0.retain(|sender| sender.send(event.clone()).is_ok()) + self.0 + .retain(|sender| sender.send_blocking(event.clone()).is_ok()) } pub fn len(&self) -> usize { self.0.len() @@ -263,10 +280,10 @@ impl ReplicaInfo { /// Subscribe to insert events. /// - /// When subscribing to a replica, you must ensure that the corresponding [`flume::Receiver`] is + /// When subscribing to a replica, you must ensure that the corresponding [`async_channel::Receiver`] is /// received from in a loop. If not receiving, local and remote inserts will hang waiting for /// the receiver to be received from. - pub fn subscribe(&mut self, sender: flume::Sender) { + pub fn subscribe(&mut self, sender: async_channel::Sender) { self.subscribers.subscribe(sender) } @@ -275,7 +292,7 @@ impl ReplicaInfo { /// Simply dropping the receiver is fine too. If you cloned a single sender to subscribe to /// multiple replicas, you can use this method to explicitly unsubscribe the sender from /// this replica without having to drop the receiver. - pub fn unsubscribe(&mut self, sender: &flume::Sender) { + pub fn unsubscribe(&mut self, sender: &async_channel::Sender) { self.subscribers.unsubscribe(sender) } @@ -2156,6 +2173,14 @@ mod tests { Ok(()) } + fn drain(events: async_channel::Receiver) -> Vec { + let mut res = vec![]; + while let Ok(ev) = events.try_recv() { + res.push(ev); + } + res + } + /// This tests that no events are emitted for entries received during sync which are obsolete /// (too old) by the time they are actually inserted in the store. #[test] @@ -2173,8 +2198,8 @@ mod tests { let mut replica1 = store1.new_replica(namespace.clone())?; let mut replica2 = store2.new_replica(namespace.clone())?; - let (events1_sender, events1) = flume::bounded(32); - let (events2_sender, events2) = flume::bounded(32); + let (events1_sender, events1) = async_channel::bounded(32); + let (events2_sender, events2) = async_channel::bounded(32); replica1.info.subscribe(events1_sender); replica2.info.subscribe(events2_sender); @@ -2198,8 +2223,8 @@ mod tests { .sync_process_message(from1, peer1, &mut state2) .unwrap(); assert!(from2.is_none()); - let events1 = events1.drain().collect::>(); - let events2 = events2.drain().collect::>(); + let events1 = drain(events1); + let events2 = drain(events2); assert_eq!(events1.len(), 1); assert_eq!(events2.len(), 1); assert!(matches!(events1[0], Event::LocalInsert { .. })); diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 4ec069fe30..d31990e63f 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -17,6 +17,7 @@ workspace = true [dependencies] anyhow = { version = "1" } +async-channel = "2.3.1" bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = false } bytes = "1" derive_more = { version = "1.0.0-beta.6", features = ["debug", "display", "from", "try_into", "from_str"] } diff --git a/iroh/src/node/rpc/docs.rs b/iroh/src/node/rpc/docs.rs index 3fc35bc597..e974d68733 100644 --- a/iroh/src/node/rpc/docs.rs +++ b/iroh/src/node/rpc/docs.rs @@ -1,11 +1,10 @@ //! This module contains an impl block on [`DocsEngine`] with handlers for RPC requests use anyhow::anyhow; -use futures_lite::Stream; +use futures_lite::{Stream, StreamExt}; use iroh_base::rpc::RpcResult; use iroh_blobs::{store::Store as BaoStore, BlobFormat}; use iroh_docs::{Author, DocTicket, NamespaceSecret}; -use tokio_stream::StreamExt; use crate::client::docs::ShareMode; use crate::node::DocsEngine; @@ -60,18 +59,18 @@ impl DocsEngine { pub fn author_list( &self, _req: AuthorListRequest, - ) -> impl Stream> { - let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); + ) -> impl Stream> + Unpin { + let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. tokio::task::spawn(async move { let tx2 = tx.clone(); if let Err(err) = sync.list_authors(tx).await { - tx2.send_async(Err(err)).await.ok(); + tx2.send(Err(err)).await.ok(); } }); - rx.into_stream().map(|r| { + rx.boxed().map(|r| { r.map(|author_id| AuthorListResponse { author_id }) .map_err(Into::into) }) @@ -111,18 +110,21 @@ impl DocsEngine { Ok(DropResponse {}) } - pub fn doc_list(&self, _req: DocListRequest) -> impl Stream> { - let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); + pub fn doc_list( + &self, + _req: DocListRequest, + ) -> impl Stream> + Unpin { + let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. tokio::task::spawn(async move { let tx2 = tx.clone(); if let Err(err) = sync.list_replicas(tx).await { - tx2.send_async(Err(err)).await.ok(); + tx2.send(Err(err)).await.ok(); } }); - rx.into_stream().map(|r| { + rx.boxed().map(|r| { r.map(|(id, capability)| DocListResponse { id, capability }) .map_err(Into::into) }) @@ -249,19 +251,19 @@ impl DocsEngine { pub fn doc_get_many( &self, req: GetManyRequest, - ) -> impl Stream> { + ) -> impl Stream> + Unpin { let GetManyRequest { doc_id, query } = req; - let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); + let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. tokio::task::spawn(async move { let tx2 = tx.clone(); if let Err(err) = sync.get_many(doc_id, query, tx).await { - tx2.send_async(Err(err)).await.ok(); + tx2.send(Err(err)).await.ok(); } }); - rx.into_stream() + rx.boxed() .map(|r| r.map(|entry| GetManyResponse { entry }).map_err(Into::into)) }