Skip to content

Commit d5da4ea

Browse files
author
Michael-F-Bryan
committed
feat!: The @wasmer/sdk Runtime has removed the limit on the maximum number of worker threads it is allowed to spawn. The corresponding poolSize option has been removed from RuntimeOptions.
1 parent c13435f commit d5da4ea

File tree

5 files changed

+29
-87
lines changed

5 files changed

+29
-87
lines changed

src/js_runtime.rs

+1-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::{
2-
num::NonZeroUsize,
32
ops::{Deref, DerefMut},
43
sync::Arc,
54
};
@@ -33,15 +32,7 @@ impl JsRuntime {
3332
impl JsRuntime {
3433
#[wasm_bindgen(constructor)]
3534
pub fn js_new(options: Option<RuntimeOptions>) -> Result<JsRuntime, Error> {
36-
let pool_size = options.as_ref().and_then(|options| options.pool_size());
37-
38-
let pool = match pool_size {
39-
Some(size) => {
40-
let size = NonZeroUsize::new(size).unwrap_or(NonZeroUsize::MIN);
41-
ThreadPool::new(size)
42-
}
43-
None => ThreadPool::new_with_max_threads()?,
44-
};
35+
let pool = ThreadPool::new();
4536

4637
let registry = match options.as_ref().and_then(|opts| opts.registry()) {
4738
Some(registry_url) => registry_url.resolve(),
@@ -108,10 +99,6 @@ const RUNTIME_OPTIONS_TYPE_DECLARATION: &str = r#"
10899
* Options used when constructing a {@link Runtime}.
109100
*/
110101
export type RuntimeOptions = {
111-
/**
112-
* The number of worker threads to use.
113-
*/
114-
poolSize?: number;
115102
/**
116103
* The GraphQL endpoint for the Wasmer registry used when looking up
117104
* packages.
@@ -137,9 +124,6 @@ extern "C" {
137124
#[wasm_bindgen(typescript_type = "RuntimeOptions")]
138125
pub type RuntimeOptions;
139126

140-
#[wasm_bindgen(method, getter, js_name = "poolSize")]
141-
fn pool_size(this: &RuntimeOptions) -> Option<usize>;
142-
143127
#[wasm_bindgen(method, getter)]
144128
fn registry(this: &RuntimeOptions) -> Option<MaybeRegistryUrl>;
145129

src/runtime.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl Runtime {
7070
}
7171

7272
pub(crate) fn with_defaults() -> Result<Self, Error> {
73-
let pool = ThreadPool::new_with_max_threads()?;
73+
let pool = ThreadPool::new();
7474
let mut rt = Runtime::new(pool);
7575

7676
rt.set_registry(crate::DEFAULT_REGISTRY, None)?;

src/tasks/post_message_payload.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ mod tests {
300300
let engine = wasmer::Engine::default();
301301
let module = wasmer::Module::new(&engine, wasm).unwrap();
302302
let flag = Arc::new(AtomicBool::new(false));
303-
let pool = ThreadPool::new(NonZeroUsize::MAX);
303+
let pool = ThreadPool::new();
304304
let runtime = Runtime::new(pool);
305305
let env = WasiEnvBuilder::new("program")
306306
.runtime(Arc::new(runtime))

src/tasks/scheduler.rs

+16-48
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{
22
collections::{BTreeMap, VecDeque},
33
fmt::Debug,
4-
num::NonZeroUsize,
54
sync::atomic::{AtomicU32, Ordering},
65
};
76

@@ -22,21 +21,20 @@ use crate::tasks::{
2221
#[derive(Debug, Clone)]
2322
pub(crate) struct Scheduler {
2423
scheduler_thread_id: u32,
25-
capacity: NonZeroUsize,
2624
channel: UnboundedSender<SchedulerMessage>,
2725
}
2826

2927
impl Scheduler {
3028
/// Spin up a scheduler on the current thread and get a channel that can be
3129
/// used to communicate with it.
32-
pub(crate) fn spawn(capacity: NonZeroUsize) -> Scheduler {
30+
pub(crate) fn spawn() -> Scheduler {
3331
let (sender, mut receiver) = mpsc::unbounded_channel();
3432

3533
let thread_id = wasmer::current_thread_id();
3634
// Safety: we just got the thread ID.
37-
let sender = unsafe { Scheduler::new(sender, thread_id, capacity) };
35+
let sender = unsafe { Scheduler::new(sender, thread_id) };
3836

39-
let mut scheduler = SchedulerState::new(capacity, sender.clone());
37+
let mut scheduler = SchedulerState::new(sender.clone());
4038

4139
tracing::debug!(thread_id, "Spinning up the scheduler");
4240
wasm_bindgen_futures::spawn_local(
@@ -67,16 +65,11 @@ impl Scheduler {
6765
///
6866
/// The `scheduler_thread_id` must match the [`wasmer::current_thread_id()`]
6967
/// otherwise these `!Send` values will be sent between threads.
70-
unsafe fn new(
71-
channel: UnboundedSender<SchedulerMessage>,
72-
scheduler_thread_id: u32,
73-
capacity: NonZeroUsize,
74-
) -> Self {
68+
unsafe fn new(channel: UnboundedSender<SchedulerMessage>, scheduler_thread_id: u32) -> Self {
7569
debug_assert_eq!(scheduler_thread_id, wasmer::current_thread_id());
7670
Scheduler {
7771
channel,
7872
scheduler_thread_id,
79-
capacity,
8073
}
8174
}
8275

@@ -102,10 +95,6 @@ impl Scheduler {
10295
Ok(())
10396
}
10497
}
105-
106-
pub(crate) fn capacity(&self) -> NonZeroUsize {
107-
self.capacity
108-
}
10998
}
11099

111100
// Safety: The only way our !Send messages will be sent to the scheduler is if
@@ -117,8 +106,6 @@ unsafe impl Sync for Scheduler {}
117106
/// The state for the actor in charge of the threadpool.
118107
#[derive(Debug)]
119108
struct SchedulerState {
120-
/// The maximum number of workers we will start.
121-
capacity: NonZeroUsize,
122109
/// Workers that are able to receive work.
123110
idle: VecDeque<WorkerHandle>,
124111
/// Workers that are currently blocked on synchronous operations and can't
@@ -130,9 +117,8 @@ struct SchedulerState {
130117
}
131118

132119
impl SchedulerState {
133-
fn new(capacity: NonZeroUsize, mailbox: Scheduler) -> Self {
120+
fn new(mailbox: Scheduler) -> Self {
134121
SchedulerState {
135-
capacity,
136122
idle: VecDeque::new(),
137123
busy: VecDeque::new(),
138124
mailbox,
@@ -213,14 +199,14 @@ impl SchedulerState {
213199
/// Send a task to one of the worker threads, preferring workers that aren't
214200
/// running synchronous work.
215201
fn post_message(&mut self, msg: PostMessagePayload) -> Result<(), Error> {
216-
let (worker, already_blocked) = self.next_available_worker()?;
202+
let worker = self.next_available_worker()?;
217203

218204
let would_block = msg.would_block();
219205
worker
220206
.send(msg)
221207
.with_context(|| format!("Unable to send a message to worker {}", worker.id()))?;
222208

223-
if would_block || already_blocked {
209+
if would_block {
224210
self.busy.push_back(worker);
225211
} else {
226212
self.idle.push_back(worker);
@@ -229,43 +215,25 @@ impl SchedulerState {
229215
Ok(())
230216
}
231217

232-
fn next_available_worker(&mut self) -> Result<(WorkerHandle, bool), Error> {
218+
fn next_available_worker(&mut self) -> Result<WorkerHandle, Error> {
233219
// First, try to send the message to an idle worker
234220
if let Some(worker) = self.idle.pop_front() {
235221
tracing::trace!(
236222
worker.id = worker.id(),
237223
"Sending the message to an idle worker"
238224
);
239-
return Ok((worker, false));
225+
return Ok(worker);
240226
}
241227

242-
if self.busy.len() + self.idle.len() < self.capacity.get() {
243-
// Rather than sending the task to one of the blocking workers,
244-
// let's spawn a new worker
245-
246-
let worker = self.start_worker()?;
247-
tracing::trace!(
248-
worker.id = worker.id(),
249-
"Sending the message to a new worker"
250-
);
251-
return Ok((worker, false));
252-
}
253-
254-
// Oh well, looks like there aren't any more idle workers and we can't
255-
// spin up any new workers, so we'll need to add load to a worker that
256-
// is already blocking.
257-
//
258-
// Note: This shouldn't panic because if there were no idle workers and
259-
// we didn't start a new worker, there should always be at least one
260-
// busy worker because our capacity is non-zero.
261-
let worker = self.busy.pop_front().unwrap();
228+
// Rather than sending the task to one of the blocking workers,
229+
// let's spawn a new worker
262230

231+
let worker = self.start_worker()?;
263232
tracing::trace!(
264233
worker.id = worker.id(),
265-
"Sending the message to a busy worker"
234+
"Sending the message to a new worker"
266235
);
267-
268-
Ok((worker, true))
236+
Ok(worker)
269237
}
270238

271239
fn start_worker(&mut self) -> Result<WorkerHandle, Error> {
@@ -309,8 +277,8 @@ mod tests {
309277
async fn spawn_an_async_function() {
310278
let (sender, receiver) = oneshot::channel();
311279
let (tx, _) = mpsc::unbounded_channel();
312-
let tx = unsafe { Scheduler::new(tx, wasmer::current_thread_id(), NonZeroUsize::MAX) };
313-
let mut scheduler = SchedulerState::new(NonZeroUsize::MAX, tx);
280+
let tx = unsafe { Scheduler::new(tx, wasmer::current_thread_id()) };
281+
let mut scheduler = SchedulerState::new(tx);
314282
let message = SchedulerMessage::SpawnAsync(Box::new(move || {
315283
Box::pin(async move {
316284
let _ = sender.send(42);

src/tasks/thread_pool.rs

+10-20
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use std::{fmt::Debug, future::Future, num::NonZeroUsize, pin::Pin};
1+
use std::{fmt::Debug, future::Future, pin::Pin};
22

3-
use anyhow::Context;
43
use futures::future::LocalBoxFuture;
54
use instant::Duration;
65
use wasm_bindgen_futures::JsFuture;
@@ -18,23 +17,11 @@ pub struct ThreadPool {
1817
}
1918

2019
impl ThreadPool {
21-
pub fn new(capacity: NonZeroUsize) -> Self {
22-
let sender = Scheduler::spawn(capacity);
20+
pub fn new() -> Self {
21+
let sender = Scheduler::spawn();
2322
ThreadPool { scheduler: sender }
2423
}
2524

26-
pub fn new_with_max_threads() -> Result<ThreadPool, anyhow::Error> {
27-
let concurrency = crate::utils::GlobalScope::current()
28-
.hardware_concurrency()
29-
.context("Unable to determine the hardware concurrency")?;
30-
// Note: We want to deliberately over-commit to avoid accidental
31-
// deadlocks.
32-
let concurrency = concurrency
33-
.checked_mul(NonZeroUsize::new(16).unwrap())
34-
.unwrap();
35-
Ok(ThreadPool::new(concurrency))
36-
}
37-
3825
/// Run an `async` function to completion on the threadpool.
3926
pub fn spawn(
4027
&self,
@@ -113,7 +100,10 @@ impl VirtualTaskManager for ThreadPool {
113100

114101
/// Returns the amount of parallelism that is possible on this platform
115102
fn thread_parallelism(&self) -> Result<usize, WasiThreadError> {
116-
Ok(self.scheduler.capacity().get())
103+
match crate::utils::GlobalScope::current().hardware_concurrency() {
104+
Some(n) => Ok(n.get()),
105+
None => Err(WasiThreadError::Unsupported),
106+
}
117107
}
118108

119109
fn spawn_with_module(
@@ -148,7 +138,7 @@ mod tests {
148138
.dyn_into()
149139
.unwrap();
150140
let module = wasmer::Module::from(module);
151-
let pool = ThreadPool::new_with_max_threads().unwrap();
141+
let pool = ThreadPool::new();
152142

153143
let (sender, receiver) = oneshot::channel();
154144
pool.spawn_with_module(
@@ -166,7 +156,7 @@ mod tests {
166156

167157
#[wasm_bindgen_test]
168158
async fn spawned_tasks_can_communicate_with_the_main_thread() {
169-
let pool = ThreadPool::new(2.try_into().unwrap());
159+
let pool = ThreadPool::new();
170160
let (sender, receiver) = oneshot::channel();
171161

172162
pool.task_shared(Box::new(move || {
@@ -203,7 +193,7 @@ mod tests {
203193
let (sender_1, receiver_1) = oneshot::channel();
204194
let (sender_2, mut receiver_2) = oneshot::channel();
205195
// Set things up so we can run 2 blocking tasks at the same time.
206-
let pool = ThreadPool::new(NonZeroUsize::new(2).unwrap());
196+
let pool = ThreadPool::new();
207197

208198
// Note: The second task depends on the first one completing
209199
let first_task = Box::new(move || sender_1.send(()).unwrap());

0 commit comments

Comments
 (0)