Skip to content

Commit 36424f4

Browse files
committed
feat: Add no_std implementation
This commit adds an implementation of the event-listener algorithm built without locks. This is a replacement of the old no_std backend. It is written without concurrent-queue and therefore closes #109. The idea behind this implementation is to store the listeners in "slots" in an infinitely large list. Then, assign each listener a number and then use that to wait on each listener. The list used is similar to the one used by the thread-local crate. It consists of a list of "buckets" that hold slots. The number of slots increases in an amortized way. The first slot holds 1, the second slot holds 2, the third slot holds 4... all the way up to usize::MAX slots. Indexes are done by having a list of reusable indexes and using those when possible, only increasing the max index when necessary. This part of the code could use some work; under contention it's possible to make some slots unusuable. This can happen under two cases: - If there is contention on the indexes list when the listener is being freed. - If the slot is still being notified when it is attempted to be reused. Both of these cases are probably fixable, and should be fixed before release. Otherwise long running server processes using this code will run out of memory under heavy loads. From here the rest of the implementation is an atomic linked list based on the above primitives. It functions very similarly to the std variant. The main difference is that the Link structure's waker functions very similarly to AtomicWaker from the atomic-waker crate. Aside from that the code isn't very interesting on its own. Signed-off-by: John Nunley <[email protected]>
1 parent 8aa7635 commit 36424f4

File tree

8 files changed

+1053
-197
lines changed

8 files changed

+1053
-197
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ portable-atomic-crate = { version = "1.6.0", package = "portable-atomic", option
2424
portable-atomic-util = { version = "0.1.5", optional = true }
2525

2626
[dev-dependencies]
27+
ahash = { version = "0.8.11", default-features = false, features = ["compile-time-rng"] }
2728
futures = { version = "0.3", default-features = false, features = ["std"] }
2829
futures-lite = "2.3.0"
30+
hashbrown = { version = "0.14.3", default-features = false }
2931
waker-fn = "1"

examples/mutex.rs

Lines changed: 149 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -4,182 +4,196 @@
44
55
#![allow(dead_code)]
66

7-
use std::cell::UnsafeCell;
8-
use std::ops::{Deref, DerefMut};
9-
use std::sync::atomic::{AtomicBool, Ordering};
10-
use std::sync::{mpsc, Arc};
11-
use std::thread;
12-
use std::time::{Duration, Instant};
13-
14-
use event_listener::{Event, Listener};
15-
16-
/// A simple mutex.
17-
struct Mutex<T> {
18-
/// Set to `true` when the mutex is locked.
19-
locked: AtomicBool,
20-
21-
/// Blocked lock operations.
22-
lock_ops: Event,
23-
24-
/// The inner protected data.
25-
data: UnsafeCell<T>,
26-
}
7+
#[cfg(feature = "std")]
8+
mod ex {
9+
use std::cell::UnsafeCell;
10+
use std::ops::{Deref, DerefMut};
11+
use std::sync::atomic::{AtomicBool, Ordering};
12+
use std::sync::{mpsc, Arc};
13+
use std::thread;
14+
use std::time::{Duration, Instant};
15+
16+
use event_listener::{Event, Listener};
17+
18+
/// A simple mutex.
19+
struct Mutex<T> {
20+
/// Set to `true` when the mutex is locked.
21+
locked: AtomicBool,
22+
23+
/// Blocked lock operations.
24+
lock_ops: Event,
25+
26+
/// The inner protected data.
27+
data: UnsafeCell<T>,
28+
}
2729

28-
unsafe impl<T: Send> Send for Mutex<T> {}
29-
unsafe impl<T: Send> Sync for Mutex<T> {}
30+
unsafe impl<T: Send> Send for Mutex<T> {}
31+
unsafe impl<T: Send> Sync for Mutex<T> {}
3032

31-
impl<T> Mutex<T> {
32-
/// Creates a mutex.
33-
fn new(t: T) -> Mutex<T> {
34-
Mutex {
35-
locked: AtomicBool::new(false),
36-
lock_ops: Event::new(),
37-
data: UnsafeCell::new(t),
33+
impl<T> Mutex<T> {
34+
/// Creates a mutex.
35+
fn new(t: T) -> Mutex<T> {
36+
Mutex {
37+
locked: AtomicBool::new(false),
38+
lock_ops: Event::new(),
39+
data: UnsafeCell::new(t),
40+
}
3841
}
39-
}
4042

41-
/// Attempts to acquire a lock.
42-
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
43-
if !self.locked.swap(true, Ordering::Acquire) {
44-
Some(MutexGuard(self))
45-
} else {
46-
None
43+
/// Attempts to acquire a lock.
44+
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
45+
if !self.locked.swap(true, Ordering::Acquire) {
46+
Some(MutexGuard(self))
47+
} else {
48+
None
49+
}
4750
}
48-
}
49-
50-
/// Blocks until a lock is acquired.
51-
fn lock(&self) -> MutexGuard<'_, T> {
52-
let mut listener = None;
5351

54-
loop {
55-
// Attempt grabbing a lock.
56-
if let Some(guard) = self.try_lock() {
57-
return guard;
58-
}
52+
/// Blocks until a lock is acquired.
53+
fn lock(&self) -> MutexGuard<'_, T> {
54+
let mut listener = None;
5955

60-
// Set up an event listener or wait for a notification.
61-
match listener.take() {
62-
None => {
63-
// Start listening and then try locking again.
64-
listener = Some(self.lock_ops.listen());
56+
loop {
57+
// Attempt grabbing a lock.
58+
if let Some(guard) = self.try_lock() {
59+
return guard;
6560
}
66-
Some(l) => {
67-
// Wait until a notification is received.
68-
l.wait();
61+
62+
// Set up an event listener or wait for a notification.
63+
match listener.take() {
64+
None => {
65+
// Start listening and then try locking again.
66+
listener = Some(self.lock_ops.listen());
67+
}
68+
Some(l) => {
69+
// Wait until a notification is received.
70+
l.wait();
71+
}
6972
}
7073
}
7174
}
72-
}
73-
74-
/// Blocks until a lock is acquired or the timeout is reached.
75-
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
76-
let deadline = Instant::now() + timeout;
77-
let mut listener = None;
7875

79-
loop {
80-
// Attempt grabbing a lock.
81-
if let Some(guard) = self.try_lock() {
82-
return Some(guard);
83-
}
76+
/// Blocks until a lock is acquired or the timeout is reached.
77+
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
78+
let deadline = Instant::now() + timeout;
79+
let mut listener = None;
8480

85-
// Set up an event listener or wait for an event.
86-
match listener.take() {
87-
None => {
88-
// Start listening and then try locking again.
89-
listener = Some(self.lock_ops.listen());
81+
loop {
82+
// Attempt grabbing a lock.
83+
if let Some(guard) = self.try_lock() {
84+
return Some(guard);
9085
}
91-
Some(l) => {
92-
// Wait until a notification is received.
93-
if l.wait_deadline(deadline).is_none() {
94-
return None;
86+
87+
// Set up an event listener or wait for an event.
88+
match listener.take() {
89+
None => {
90+
// Start listening and then try locking again.
91+
listener = Some(self.lock_ops.listen());
92+
}
93+
Some(l) => {
94+
// Wait until a notification is received.
95+
if l.wait_deadline(deadline).is_none() {
96+
return None;
97+
}
9598
}
9699
}
97100
}
98101
}
99-
}
100102

101-
/// Acquires a lock asynchronously.
102-
async fn lock_async(&self) -> MutexGuard<'_, T> {
103-
let mut listener = None;
104-
105-
loop {
106-
// Attempt grabbing a lock.
107-
if let Some(guard) = self.try_lock() {
108-
return guard;
109-
}
103+
/// Acquires a lock asynchronously.
104+
async fn lock_async(&self) -> MutexGuard<'_, T> {
105+
let mut listener = None;
110106

111-
// Set up an event listener or wait for an event.
112-
match listener.take() {
113-
None => {
114-
// Start listening and then try locking again.
115-
listener = Some(self.lock_ops.listen());
107+
loop {
108+
// Attempt grabbing a lock.
109+
if let Some(guard) = self.try_lock() {
110+
return guard;
116111
}
117-
Some(l) => {
118-
// Wait until a notification is received.
119-
l.await;
112+
113+
// Set up an event listener or wait for an event.
114+
match listener.take() {
115+
None => {
116+
// Start listening and then try locking again.
117+
listener = Some(self.lock_ops.listen());
118+
}
119+
Some(l) => {
120+
// Wait until a notification is received.
121+
l.await;
122+
}
120123
}
121124
}
122125
}
123126
}
124-
}
125127

126-
/// A guard holding a lock.
127-
struct MutexGuard<'a, T>(&'a Mutex<T>);
128+
/// A guard holding a lock.
129+
struct MutexGuard<'a, T>(&'a Mutex<T>);
128130

129-
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
130-
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
131+
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
132+
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
131133

132-
impl<T> Drop for MutexGuard<'_, T> {
133-
fn drop(&mut self) {
134-
self.0.locked.store(false, Ordering::Release);
135-
self.0.lock_ops.notify(1);
134+
impl<T> Drop for MutexGuard<'_, T> {
135+
fn drop(&mut self) {
136+
self.0.locked.store(false, Ordering::Release);
137+
self.0.lock_ops.notify(1);
138+
}
136139
}
137-
}
138140

139-
impl<T> Deref for MutexGuard<'_, T> {
140-
type Target = T;
141+
impl<T> Deref for MutexGuard<'_, T> {
142+
type Target = T;
141143

142-
fn deref(&self) -> &T {
143-
unsafe { &*self.0.data.get() }
144+
fn deref(&self) -> &T {
145+
unsafe { &*self.0.data.get() }
146+
}
144147
}
145-
}
146148

147-
impl<T> DerefMut for MutexGuard<'_, T> {
148-
fn deref_mut(&mut self) -> &mut T {
149-
unsafe { &mut *self.0.data.get() }
149+
impl<T> DerefMut for MutexGuard<'_, T> {
150+
fn deref_mut(&mut self) -> &mut T {
151+
unsafe { &mut *self.0.data.get() }
152+
}
150153
}
151-
}
152154

153-
fn main() {
154-
const N: usize = 10;
155+
pub(crate) fn entry() {
156+
const N: usize = 10;
155157

156-
// A shared counter.
157-
let counter = Arc::new(Mutex::new(0));
158+
// A shared counter.
159+
let counter = Arc::new(Mutex::new(0));
158160

159-
// A channel that signals when all threads are done.
160-
let (tx, rx) = mpsc::channel();
161+
// A channel that signals when all threads are done.
162+
let (tx, rx) = mpsc::channel();
161163

162-
// Spawn a bunch of threads incrementing the counter.
163-
for _ in 0..N {
164-
let counter = counter.clone();
165-
let tx = tx.clone();
164+
// Spawn a bunch of threads incrementing the counter.
165+
for _ in 0..N {
166+
let counter = counter.clone();
167+
let tx = tx.clone();
166168

167-
thread::spawn(move || {
168-
let mut counter = counter.lock();
169-
*counter += 1;
169+
thread::spawn(move || {
170+
let mut counter = counter.lock();
171+
*counter += 1;
170172

171-
// If this is the last increment, signal that we're done.
172-
if *counter == N {
173-
tx.send(()).unwrap();
174-
}
175-
});
176-
}
173+
// If this is the last increment, signal that we're done.
174+
if *counter == N {
175+
tx.send(()).unwrap();
176+
}
177+
});
178+
}
177179

178-
// Wait until the last thread increments the counter.
179-
rx.recv().unwrap();
180+
// Wait until the last thread increments the counter.
181+
rx.recv().unwrap();
180182

181-
// The counter must equal the number of threads.
182-
assert_eq!(*counter.lock(), N);
183+
// The counter must equal the number of threads.
184+
assert_eq!(*counter.lock(), N);
183185

184-
println!("Done!");
186+
println!("Done!");
187+
}
188+
}
189+
190+
#[cfg(not(feature = "std"))]
191+
mod ex {
192+
pub(crate) fn entry() {
193+
eprintln!("this example requires the 'std' feature")
194+
}
195+
}
196+
197+
fn main() {
198+
ex::entry();
185199
}

0 commit comments

Comments
 (0)