Skip to content

Commit 2964569

Browse files
authored
refactor(iroh): Remove flume from iroh gossip (#2542)
## Description refactor(iroh): Remove flume from iroh gossip Yes, I know there is a PR that touches gossip. But just let me do my purge. ## Breaking Changes None ## Notes & open questions None ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented.
1 parent 22314a1 commit 2964569

File tree

3 files changed

+10
-11
lines changed

3 files changed

+10
-11
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-gossip/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ tokio-util = { version = "0.7.8", optional = true, features = ["codec"] }
3838
genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] }
3939

4040
# dispatcher dependencies (optional)
41+
async-channel = { version = "2.3.1", optional = true }
4142
futures-util = { version = "0.3.30", optional = true }
42-
flume = { version = "0.11", optional = true }
4343

4444
[dev-dependencies]
4545
clap = { version = "4", features = ["derive"] }
@@ -51,7 +51,7 @@ url = "2.4.0"
5151
[features]
5252
default = ["net", "dispatcher"]
5353
net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util"]
54-
dispatcher = ["dep:flume", "dep:futures-util"]
54+
dispatcher = ["dep:async-channel", "dep:futures-util"]
5555

5656
[[example]]
5757
name = "chat"

iroh-gossip/src/dispatcher.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::{
1010
proto::{DeliveryScope, TopicId},
1111
};
1212
use bytes::Bytes;
13+
use futures_lite::StreamExt;
1314
use futures_util::Stream;
1415
use iroh_base::rpc::{RpcError, RpcResult};
1516
use iroh_net::{key::PublicKey, util::AbortingJoinHandle, NodeId};
@@ -106,7 +107,7 @@ struct State {
106107
/// Type alias for a stream of gossip updates, so we don't have to repeat all the bounds.
107108
type CommandStream = Box<dyn Stream<Item = Command> + Send + Sync + Unpin + 'static>;
108109
/// Type alias for a sink of gossip events.
109-
type EventSink = flume::Sender<RpcResult<Event>>;
110+
type EventSink = async_channel::Sender<RpcResult<Event>>;
110111

111112
#[derive(derive_more::Debug)]
112113
enum TopicState {
@@ -214,7 +215,7 @@ impl GossipDispatcher {
214215
/// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full.
215216
fn try_send(send: &EventSink, event: &IrohGossipEvent) -> bool {
216217
// If the stream is disconnected, we don't need to send to it.
217-
if send.is_disconnected() {
218+
if send.is_closed() {
218219
return false;
219220
}
220221
// Check if the send buffer is almost full, and send a lagged response if it is.
@@ -234,7 +235,6 @@ impl GossipDispatcher {
234235
///
235236
/// This should not fail unless the gossip instance is faulty.
236237
async fn dispatch_loop(mut self) -> anyhow::Result<()> {
237-
use futures_lite::stream::StreamExt;
238238
let stream = self.gossip.clone().subscribe_all();
239239
tokio::pin!(stream);
240240
while let Some(item) = stream.next().await {
@@ -306,7 +306,6 @@ impl GossipDispatcher {
306306
topic: TopicId,
307307
mut updates: CommandStream,
308308
) -> anyhow::Result<()> {
309-
use futures_lite::stream::StreamExt;
310309
while let Some(update) = Pin::new(&mut updates).next().await {
311310
match update {
312311
Command::Broadcast(msg) => {
@@ -404,7 +403,7 @@ impl GossipDispatcher {
404403
let mut update_tasks = vec![];
405404
for (updates, event_sink) in waiting {
406405
// if the stream is disconnected, we don't need to keep it and start the update task
407-
if event_sink.is_disconnected() {
406+
if event_sink.is_closed() {
408407
continue;
409408
}
410409
event_sinks.push(event_sink);
@@ -438,9 +437,9 @@ impl GossipDispatcher {
438437
topic: TopicId,
439438
options: SubscribeOptions,
440439
updates: CommandStream,
441-
) -> impl Stream<Item = RpcResult<Event>> {
440+
) -> impl Stream<Item = RpcResult<Event>> + Unpin {
442441
let mut inner = self.inner.lock().unwrap();
443-
let (send, recv) = flume::bounded(options.subscription_capacity);
442+
let (send, recv) = async_channel::bounded(options.subscription_capacity);
444443
match inner.current_subscriptions.entry(topic) {
445444
Entry::Vacant(entry) => {
446445
// There is no existing subscription, so we need to start a new one.
@@ -490,7 +489,7 @@ impl GossipDispatcher {
490489
}
491490
}
492491
}
493-
recv.into_stream()
492+
recv.boxed()
494493
}
495494
}
496495

0 commit comments

Comments
 (0)