Skip to content

Commit 9052905

Browse files
authored
fix(iroh-blobs): use async_channel instead of flume for local_pool (#2533)
## Description During soft shutdown of the local pool, a Finish message is sent to all threads. On main, this occasionally hangs. Further investigation showed that this is a message that is being sent but not received despite being in the channel. Adding a simple timeout to the select! so the flume recv call is executed again fixes it. See discussion in https://discord.com/channels/949724860232392765/950683937661935667/1265205285618847744 So it seems that there is a bug in flume that occasionally leads to notifications being dropped. This PR just does a 1:1 replacement of flume with async_channel. Before this change, I can get the test_shutdown test to fail easily by running it 1000 times: ``` for i in $(seq 1 1000); do cargo test --release -p iroh-blobs test_shutdown -- --nocapture >> log.txt; done ``` Result: ``` ... sending shutdown message sending shutdown message sending shutdown message sending shutdown message test util::local_pool::tests::test_shutdown has been running for over 60 seconds ``` After this change, I can not get test_finish (renamed because it tests finish, not shutdown) to fail at all even after several 1000 tests. ## Breaking Changes None ## Notes & open questions Can somebody talk me out of this? I would prefer to keep flume, but the evidence above seems conclusive... Note: why not tokio::sync::mpsc::channel? I need a mpmc channel. The handle can be cloned, and can send to any of the n worker threads. ## Change checklist - [x] Self-review. - [x] ~~Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant.~~ - [x] ~~Tests if relevant.~~ - [x] ~~All breaking changes documented.~~
1 parent 5c60a52 commit 9052905

File tree

3 files changed

+47
-13
lines changed

3 files changed

+47
-13
lines changed

Cargo.lock

Lines changed: 35 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-blobs/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ workspace = true
1717

1818
[dependencies]
1919
anyhow = { version = "1" }
20+
async-channel = "2.3.1"
2021
bao-tree = { version = "0.13", features = ["tokio_fsm", "validate"], default-features = false }
2122
bytes = { version = "1.4", features = ["serde"] }
2223
chrono = "0.4.31"

iroh-blobs/src/util/local_pool.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl Deref for LocalPool {
6161
#[derive(Debug, Clone)]
6262
pub struct LocalPoolHandle {
6363
/// The sender half of the channel used to send tasks to the pool
64-
send: flume::Sender<Message>,
64+
send: async_channel::Sender<Message>,
6565
}
6666

6767
/// What to do when a panic occurs in a pool thread
@@ -124,7 +124,7 @@ impl LocalPool {
124124
panic_mode,
125125
} = config;
126126
let cancel_token = CancellationToken::new();
127-
let (send, recv) = flume::unbounded::<Message>();
127+
let (send, recv) = async_channel::unbounded::<Message>();
128128
let shutdown_sem = Arc::new(Semaphore::new(0));
129129
let handle = tokio::runtime::Handle::current();
130130
let handles = (0..threads)
@@ -159,7 +159,7 @@ impl LocalPool {
159159
/// Spawn a new pool thread.
160160
fn spawn_pool_thread(
161161
thread_name: String,
162-
recv: flume::Receiver<Message>,
162+
recv: async_channel::Receiver<Message>,
163163
cancel_token: CancellationToken,
164164
panic_mode: PanicMode,
165165
shutdown_sem: Arc<Semaphore>,
@@ -198,18 +198,18 @@ impl LocalPool {
198198
// if the cancel token is cancelled, break the loop immediately
199199
_ = cancel_token.cancelled() => break ShutdownMode::Stop,
200200
// if we receive a message, execute it
201-
msg = recv.recv_async() => {
201+
msg = recv.recv() => {
202202
match msg {
203-
// just push into the FuturesUnordered
203+
// just push into the join set
204204
Ok(Message::Execute(f)) => {
205205
s.spawn_local((f)());
206206
}
207207
// break with optional semaphore
208208
Ok(Message::Finish) => break ShutdownMode::Finish,
209209
// if the sender is dropped, break the loop immediately
210-
Err(flume::RecvError::Disconnected) => break ShutdownMode::Stop,
210+
Err(async_channel::RecvError) => break ShutdownMode::Stop,
211211
}
212-
}
212+
},
213213
}
214214
}
215215
}));
@@ -247,7 +247,7 @@ impl LocalPool {
247247

248248
/// Immediately stop polling all tasks and wait for all threads to finish.
249249
///
250-
/// This is like droo, but waits for thread completion asynchronously.
250+
/// This is like drop, but waits for thread completion asynchronously.
251251
///
252252
/// If there was a panic on any of the threads, it will be re-thrown here.
253253
pub async fn shutdown(self) {
@@ -270,14 +270,13 @@ impl LocalPool {
270270
// we assume that there are exactly as many threads as there are handles.
271271
// also, we assume that the threads are still running.
272272
for _ in 0..self.threads_u32() {
273-
println!("sending shutdown message");
274273
// send the shutdown message
275274
// sending will fail if all threads are already finished, but
276275
// in that case we don't need to do anything.
277276
//
278277
// Threads will add a permit in any case, so await_thread_completion
279278
// will then immediately return.
280-
self.send.send(Message::Finish).ok();
279+
self.send.send(Message::Finish).await.ok();
281280
}
282281
self.await_thread_completion().await;
283282
}
@@ -460,7 +459,7 @@ impl LocalPoolHandle {
460459
/// spawn a task in the pool.
461460
pub fn try_spawn_detached_boxed(&self, gen: SpawnFn) -> SpawnResult<()> {
462461
self.send
463-
.send(Message::Execute(gen))
462+
.send_blocking(Message::Execute(gen))
464463
.map_err(|_| SpawnError::Cancelled)
465464
}
466465
}
@@ -593,7 +592,7 @@ mod tests {
593592
}
594593

595594
#[tokio::test]
596-
async fn test_shutdown() {
595+
async fn test_finish() {
597596
let _ = tracing_subscriber::fmt::try_init();
598597
let pool = LocalPool::new(Config::default());
599598
let counter = Arc::new(AtomicU64::new(0));

0 commit comments

Comments
 (0)