Skip to content

Commit a5720e9

Browse files
committed
Replace flume with async_channel in docs
This is mostly a 1:1 replacement, except for the fact that the same_channel api is missing from async_channel. So I replaced it with some ugly code that uses the fact that a async_channel Sender or Receiver is just an Arc<Channel>. To be removed if/when smol-rs/async-channel#98 is merged, but until then I think it is fine.
1 parent 9052905 commit a5720e9

File tree

9 files changed

+187
-71
lines changed

9 files changed

+187
-71
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-blobs/src/util/progress.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,93 @@ impl<T: Send + Sync + 'static> ProgressSender for FlumeProgressSender<T> {
518518
}
519519
}
520520

521+
/// A progress sender that uses a flume channel.
522+
pub struct AsyncChannelProgressSender<T> {
523+
sender: async_channel::Sender<T>,
524+
id: std::sync::Arc<std::sync::atomic::AtomicU64>,
525+
}
526+
527+
impl<T> std::fmt::Debug for AsyncChannelProgressSender<T> {
528+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
529+
f.debug_struct("AsyncChannelProgressSender")
530+
.field("id", &self.id)
531+
.field("sender", &self.sender)
532+
.finish()
533+
}
534+
}
535+
536+
impl<T> Clone for AsyncChannelProgressSender<T> {
537+
fn clone(&self) -> Self {
538+
Self {
539+
sender: self.sender.clone(),
540+
id: self.id.clone(),
541+
}
542+
}
543+
}
544+
545+
impl<T> AsyncChannelProgressSender<T> {
546+
/// Create a new progress sender from a flume sender.
547+
pub fn new(sender: async_channel::Sender<T>) -> Self {
548+
Self {
549+
sender,
550+
id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
551+
}
552+
}
553+
554+
/// Returns true if `other` sends on the same `flume` channel as `self`.
555+
pub fn same_channel(&self, other: &AsyncChannelProgressSender<T>) -> bool {
556+
same_channel(&self.sender, &other.sender)
557+
}
558+
}
559+
560+
fn get_as_ptr<T>(value: &T) -> Option<usize> {
561+
use std::mem;
562+
if mem::size_of::<T>() == std::mem::size_of::<usize>()
563+
&& mem::align_of::<T>() == mem::align_of::<usize>()
564+
{
565+
// Safe only if size and alignment requirements are met
566+
unsafe { Some(mem::transmute_copy(value)) }
567+
} else {
568+
None
569+
}
570+
}
571+
572+
fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
573+
get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
574+
}
575+
576+
impl<T> IdGenerator for AsyncChannelProgressSender<T> {
577+
fn new_id(&self) -> u64 {
578+
self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
579+
}
580+
}
581+
582+
impl<T: Send + Sync + 'static> ProgressSender for AsyncChannelProgressSender<T> {
583+
type Msg = T;
584+
585+
async fn send(&self, msg: Self::Msg) -> std::result::Result<(), ProgressSendError> {
586+
self.sender
587+
.send(msg)
588+
.await
589+
.map_err(|_| ProgressSendError::ReceiverDropped)
590+
}
591+
592+
fn try_send(&self, msg: Self::Msg) -> std::result::Result<(), ProgressSendError> {
593+
match self.sender.try_send(msg) {
594+
Ok(_) => Ok(()),
595+
Err(async_channel::TrySendError::Full(_)) => Ok(()),
596+
Err(async_channel::TrySendError::Closed(_)) => Err(ProgressSendError::ReceiverDropped),
597+
}
598+
}
599+
600+
fn blocking_send(&self, msg: Self::Msg) -> std::result::Result<(), ProgressSendError> {
601+
match self.sender.send_blocking(msg) {
602+
Ok(_) => Ok(()),
603+
Err(_) => Err(ProgressSendError::ReceiverDropped),
604+
}
605+
}
606+
}
607+
521608
/// An error that can occur when sending progress messages.
522609
///
523610
/// Really the only error that can occur is if the receiver is dropped.

iroh-docs/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ workspace = true
1616

1717
[dependencies]
1818
anyhow = "1"
19+
async-channel = "2.3.1"
1920
blake3 = { package = "iroh-blake3", version = "1.4.5"}
2021
bytes = { version = "1.4", features = ["serde"] }
2122
derive_more = { version = "1.0.0-beta.6", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref"] }
2223
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
23-
flume = "0.11"
2424
futures-buffered = "0.2.4"
2525
futures-lite = "2.3.0"
2626
futures-util = { version = "0.3.25" }

iroh-docs/src/actor.rs

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ enum Action {
6161
#[display("ListAuthors")]
6262
ListAuthors {
6363
#[debug("reply")]
64-
reply: flume::Sender<Result<AuthorId>>,
64+
reply: async_channel::Sender<Result<AuthorId>>,
6565
},
6666
#[display("ListReplicas")]
6767
ListReplicas {
6868
#[debug("reply")]
69-
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
69+
reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
7070
},
7171
#[display("ContentHashes")]
7272
ContentHashes {
@@ -108,12 +108,12 @@ enum ReplicaAction {
108108
reply: oneshot::Sender<Result<()>>,
109109
},
110110
Subscribe {
111-
sender: flume::Sender<Event>,
111+
sender: async_channel::Sender<Event>,
112112
#[debug("reply")]
113113
reply: oneshot::Sender<Result<()>>,
114114
},
115115
Unsubscribe {
116-
sender: flume::Sender<Event>,
116+
sender: async_channel::Sender<Event>,
117117
#[debug("reply")]
118118
reply: oneshot::Sender<Result<()>>,
119119
},
@@ -166,7 +166,7 @@ enum ReplicaAction {
166166
},
167167
GetMany {
168168
query: Query,
169-
reply: flume::Sender<Result<SignedEntry>>,
169+
reply: async_channel::Sender<Result<SignedEntry>>,
170170
},
171171
DropReplica {
172172
reply: oneshot::Sender<Result<()>>,
@@ -222,7 +222,7 @@ struct OpenReplica {
222222
/// [`SyncHandle::drop`] will not block.
223223
#[derive(Debug, Clone)]
224224
pub struct SyncHandle {
225-
tx: flume::Sender<Action>,
225+
tx: async_channel::Sender<Action>,
226226
join_handle: Arc<Option<JoinHandle<()>>>,
227227
}
228228

@@ -232,7 +232,7 @@ pub struct OpenOpts {
232232
/// Set to true to set sync state to true.
233233
pub sync: bool,
234234
/// Optionally subscribe to replica events.
235-
pub subscribe: Option<flume::Sender<Event>>,
235+
pub subscribe: Option<async_channel::Sender<Event>>,
236236
}
237237
impl OpenOpts {
238238
/// Set sync state to true.
@@ -241,7 +241,7 @@ impl OpenOpts {
241241
self
242242
}
243243
/// Subscribe to replica events.
244-
pub fn subscribe(mut self, subscribe: flume::Sender<Event>) -> Self {
244+
pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
245245
self.subscribe = Some(subscribe);
246246
self
247247
}
@@ -255,7 +255,7 @@ impl SyncHandle {
255255
content_status_callback: Option<ContentStatusCallback>,
256256
me: String,
257257
) -> SyncHandle {
258-
let (action_tx, action_rx) = flume::bounded(ACTION_CAP);
258+
let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
259259
let actor = Actor {
260260
store,
261261
states: Default::default(),
@@ -298,7 +298,7 @@ impl SyncHandle {
298298
pub async fn subscribe(
299299
&self,
300300
namespace: NamespaceId,
301-
sender: flume::Sender<Event>,
301+
sender: async_channel::Sender<Event>,
302302
) -> Result<()> {
303303
let (reply, rx) = oneshot::channel();
304304
self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
@@ -309,7 +309,7 @@ impl SyncHandle {
309309
pub async fn unsubscribe(
310310
&self,
311311
namespace: NamespaceId,
312-
sender: flume::Sender<Event>,
312+
sender: async_channel::Sender<Event>,
313313
) -> Result<()> {
314314
let (reply, rx) = oneshot::channel();
315315
self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
@@ -435,7 +435,7 @@ impl SyncHandle {
435435
&self,
436436
namespace: NamespaceId,
437437
query: Query,
438-
reply: flume::Sender<Result<SignedEntry>>,
438+
reply: async_channel::Sender<Result<SignedEntry>>,
439439
) -> Result<()> {
440440
let action = ReplicaAction::GetMany { query, reply };
441441
self.send_replica(namespace, action).await?;
@@ -489,13 +489,13 @@ impl SyncHandle {
489489
Ok(store)
490490
}
491491

492-
pub async fn list_authors(&self, reply: flume::Sender<Result<AuthorId>>) -> Result<()> {
492+
pub async fn list_authors(&self, reply: async_channel::Sender<Result<AuthorId>>) -> Result<()> {
493493
self.send(Action::ListAuthors { reply }).await
494494
}
495495

496496
pub async fn list_replicas(
497497
&self,
498-
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
498+
reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
499499
) -> Result<()> {
500500
self.send(Action::ListReplicas { reply }).await
501501
}
@@ -566,7 +566,7 @@ impl SyncHandle {
566566

567567
async fn send(&self, action: Action) -> Result<()> {
568568
self.tx
569-
.send_async(action)
569+
.send(action)
570570
.await
571571
.context("sending to iroh_docs actor failed")?;
572572
Ok(())
@@ -581,7 +581,10 @@ impl Drop for SyncHandle {
581581
fn drop(&mut self) {
582582
// this means we're dropping the last reference
583583
if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
584-
self.tx.send(Action::Shutdown { reply: None }).ok();
584+
// this call is the reason tx can not be a tokio mpsc channel.
585+
// we have no control about where drop is called, yet tokio send_blocking panics
586+
// when called from inside a tokio runtime.
587+
self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
585588
let handle = handle.take().expect("this can only run once");
586589
if let Err(err) = handle.join() {
587590
warn!(?err, "Failed to join sync actor");
@@ -593,7 +596,7 @@ impl Drop for SyncHandle {
593596
struct Actor {
594597
store: Store,
595598
states: OpenReplicas,
596-
action_rx: flume::Receiver<Action>,
599+
action_rx: async_channel::Receiver<Action>,
597600
content_status_callback: Option<ContentStatusCallback>,
598601
tasks: JoinSet<()>,
599602
}
@@ -619,10 +622,10 @@ impl Actor {
619622
}
620623
continue;
621624
}
622-
action = self.action_rx.recv_async() => {
625+
action = self.action_rx.recv() => {
623626
match action {
624627
Ok(action) => action,
625-
Err(flume::RecvError::Disconnected) => {
628+
Err(async_channel::RecvError) => {
626629
debug!("action channel disconnected");
627630
break None;
628631
}
@@ -979,17 +982,14 @@ impl OpenReplicas {
979982
}
980983

981984
async fn iter_to_channel_async<T: Send + 'static>(
982-
channel: flume::Sender<Result<T>>,
985+
channel: async_channel::Sender<Result<T>>,
983986
iter: Result<impl Iterator<Item = Result<T>>>,
984987
) -> Result<(), SendReplyError> {
985988
match iter {
986-
Err(err) => channel
987-
.send_async(Err(err))
988-
.await
989-
.map_err(send_reply_error)?,
989+
Err(err) => channel.send(Err(err)).await.map_err(send_reply_error)?,
990990
Ok(iter) => {
991991
for item in iter {
992-
channel.send_async(item).await.map_err(send_reply_error)?;
992+
channel.send(item).await.map_err(send_reply_error)?;
993993
}
994994
}
995995
}
@@ -1032,10 +1032,10 @@ mod tests {
10321032
let id = namespace.id();
10331033
sync.import_namespace(namespace.into()).await?;
10341034
sync.open(id, Default::default()).await?;
1035-
let (tx, rx) = flume::bounded(10);
1035+
let (tx, rx) = async_channel::bounded(10);
10361036
sync.subscribe(id, tx).await?;
10371037
sync.close(id).await?;
1038-
assert!(rx.recv_async().await.is_err());
1038+
assert!(rx.recv().await.is_err());
10391039
Ok(())
10401040
}
10411041
}

iroh-docs/src/engine.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,15 +170,15 @@ impl Engine {
170170

171171
// Subscribe to insert events from the replica.
172172
let a = {
173-
let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP);
173+
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
174174
this.sync.subscribe(namespace, s).await?;
175-
r.into_stream()
176-
.map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
175+
Box::pin(r).map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
177176
};
178177

179178
// Subscribe to events from the [`live::Actor`].
180179
let b = {
181-
let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP);
180+
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
181+
let r = Box::pin(r);
182182
let (reply, reply_rx) = oneshot::channel();
183183
this.to_live_actor
184184
.send(ToLiveActor::Subscribe {
@@ -188,7 +188,7 @@ impl Engine {
188188
})
189189
.await?;
190190
reply_rx.await??;
191-
r.into_stream().map(|event| Ok(LiveEvent::from(event)))
191+
r.map(|event| Ok(LiveEvent::from(event)))
192192
};
193193

194194
Ok(a.or(b))

0 commit comments

Comments
 (0)