Skip to content

Commit 3a6e800

Browse files
authored
refactor: time driver APIs (#20)
Signed-off-by: tison <[email protected]>
1 parent 137d578 commit 3a6e800

File tree

3 files changed

+112
-82
lines changed

3 files changed

+112
-82
lines changed

fastimer-driver/src/heap.rs

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright 2024 FastLabs Developers
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BinaryHeap;
16+
use std::ops::ControlFlow;
17+
use std::sync::atomic;
18+
use std::sync::atomic::AtomicBool;
19+
use std::sync::Arc;
20+
use std::time::Duration;
21+
use std::time::Instant;
22+
23+
use crossbeam_queue::SegQueue;
24+
use parking::Parker;
25+
use parking::Unparker;
26+
27+
use crate::TimeContext;
28+
use crate::TimeDriverShutdown;
29+
use crate::TimeEntry;
30+
31+
/// Returns a new time driver, its time context and the shutdown handle.
32+
pub fn binary_heap_driver() -> (BinaryHeapTimeDriver, TimeContext, TimeDriverShutdown) {
33+
let (parker, unparker) = parking::pair();
34+
let timers = BinaryHeap::new();
35+
let inbounds = Arc::new(SegQueue::new());
36+
let shutdown = Arc::new(AtomicBool::new(false));
37+
38+
let driver = BinaryHeapTimeDriver {
39+
parker,
40+
unparker,
41+
timers,
42+
inbounds,
43+
shutdown,
44+
};
45+
46+
let context = TimeContext {
47+
unparker: driver.unparker.clone(),
48+
inbounds: driver.inbounds.clone(),
49+
};
50+
51+
let shutdown = TimeDriverShutdown {
52+
unparker: driver.unparker.clone(),
53+
shutdown: driver.shutdown.clone(),
54+
};
55+
56+
(driver, context, shutdown)
57+
}
58+
59+
/// A heap-based time driver that drives registered timers.
60+
#[derive(Debug)]
61+
pub struct BinaryHeapTimeDriver {
62+
parker: Parker,
63+
unparker: Unparker,
64+
timers: BinaryHeap<TimeEntry>,
65+
inbounds: Arc<SegQueue<TimeEntry>>,
66+
shutdown: Arc<AtomicBool>,
67+
}
68+
69+
impl BinaryHeapTimeDriver {
70+
/// Drives the timers and returns `true` if the driver has been shut down.
71+
pub fn turn(&mut self) -> ControlFlow<()> {
72+
if self.shutdown.load(atomic::Ordering::Acquire) {
73+
return ControlFlow::Break(());
74+
}
75+
76+
match self.timers.peek() {
77+
None => self.parker.park(),
78+
Some(entry) => {
79+
let delta = entry.when.saturating_duration_since(Instant::now());
80+
if delta > Duration::ZERO {
81+
self.parker.park_timeout(delta);
82+
}
83+
}
84+
}
85+
86+
while let Some(entry) = self.inbounds.pop() {
87+
self.timers.push(entry);
88+
}
89+
90+
while let Some(entry) = self.timers.peek() {
91+
if entry.when <= Instant::now() {
92+
entry.waker.wake();
93+
let _ = self.timers.pop();
94+
} else {
95+
break;
96+
}
97+
}
98+
99+
if self.shutdown.load(atomic::Ordering::Acquire) {
100+
ControlFlow::Break(())
101+
} else {
102+
ControlFlow::Continue(())
103+
}
104+
}
105+
}

fastimer-driver/src/lib.rs

+3-77
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! Runtime-agnostic time driver for creating delay futures.
1919
2020
use std::cmp;
21-
use std::collections::BinaryHeap;
2221
use std::future::Future;
2322
use std::pin::Pin;
2423
use std::sync::atomic;
@@ -33,11 +32,10 @@ use atomic_waker::AtomicWaker;
3332
use crossbeam_queue::SegQueue;
3433
use fastimer::make_instant_from_now;
3534
use fastimer::MakeDelay;
36-
use parking::Parker;
3735
use parking::Unparker;
3836

39-
#[cfg(test)]
40-
mod tests;
37+
mod heap;
38+
pub use heap::*;
4139

4240
#[derive(Debug)]
4341
struct TimeEntry {
@@ -55,7 +53,7 @@ impl Eq for TimeEntry {}
5553

5654
impl PartialOrd for TimeEntry {
5755
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
58-
Some(self.when.cmp(&other.when))
56+
Some(self.cmp(other))
5957
}
6058
}
6159

@@ -96,34 +94,6 @@ impl Drop for Delay {
9694
}
9795
}
9896

99-
/// Returns a new time driver, its time context and the shutdown handle.
100-
pub fn driver() -> (TimeDriver, TimeContext, TimeDriverShutdown) {
101-
let (parker, unparker) = parking::pair();
102-
let timers = BinaryHeap::new();
103-
let inbounds = Arc::new(SegQueue::new());
104-
let shutdown = Arc::new(AtomicBool::new(false));
105-
106-
let driver = TimeDriver {
107-
parker,
108-
unparker,
109-
timers,
110-
inbounds,
111-
shutdown,
112-
};
113-
114-
let context = TimeContext {
115-
unparker: driver.unparker.clone(),
116-
inbounds: driver.inbounds.clone(),
117-
};
118-
119-
let shutdown = TimeDriverShutdown {
120-
unparker: driver.unparker.clone(),
121-
shutdown: driver.shutdown.clone(),
122-
};
123-
124-
(driver, context, shutdown)
125-
}
126-
12797
/// A time context for creating [`Delay`]s.
12898
#[derive(Debug, Clone)]
12999
pub struct TimeContext {
@@ -165,50 +135,6 @@ impl TimeDriverShutdown {
165135
}
166136
}
167137

168-
/// A time driver that drives registered timers.
169-
#[derive(Debug)]
170-
pub struct TimeDriver {
171-
parker: Parker,
172-
unparker: Unparker,
173-
timers: BinaryHeap<TimeEntry>,
174-
inbounds: Arc<SegQueue<TimeEntry>>,
175-
shutdown: Arc<AtomicBool>,
176-
}
177-
178-
impl TimeDriver {
179-
/// Drives the timers and returns `true` if the driver has been shut down.
180-
pub fn turn(&mut self) -> bool {
181-
if self.shutdown.load(atomic::Ordering::Acquire) {
182-
return true;
183-
}
184-
185-
match self.timers.peek() {
186-
None => self.parker.park(),
187-
Some(entry) => {
188-
let delta = entry.when.saturating_duration_since(Instant::now());
189-
if delta > Duration::ZERO {
190-
self.parker.park_timeout(delta);
191-
}
192-
}
193-
}
194-
195-
while let Some(entry) = self.inbounds.pop() {
196-
self.timers.push(entry);
197-
}
198-
199-
while let Some(entry) = self.timers.peek() {
200-
if entry.when <= Instant::now() {
201-
entry.waker.wake();
202-
let _ = self.timers.pop();
203-
} else {
204-
break;
205-
}
206-
}
207-
208-
self.shutdown.load(atomic::Ordering::Acquire)
209-
}
210-
}
211-
212138
/// A delay implementation that uses the given time context.
213139
#[derive(Debug, Clone)]
214140
pub struct MakeFastimerDelay(TimeContext);

fastimer-driver/src/tests.rs renamed to fastimer-driver/tests/integration.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ use std::time::Duration;
1616
use std::time::Instant;
1717

1818
use fastimer::make_instant_from_now;
19-
20-
use crate::driver;
19+
use fastimer_driver::binary_heap_driver;
2120

2221
fn assert_duration_eq(actual: Duration, expected: Duration) {
2322
if expected.abs_diff(expected) > Duration::from_millis(5) {
@@ -26,11 +25,11 @@ fn assert_duration_eq(actual: Duration, expected: Duration) {
2625
}
2726

2827
#[test]
29-
fn test_simple_driver() {
30-
let (mut driver, context, shutdown) = driver();
28+
fn test_binary_heap_driver() {
29+
let (mut driver, context, shutdown) = binary_heap_driver();
3130
let (tx, rx) = std::sync::mpsc::channel();
3231
std::thread::spawn(move || loop {
33-
if driver.turn() {
32+
if driver.turn().is_break() {
3433
tx.send(()).unwrap();
3534
break;
3635
}

0 commit comments

Comments
 (0)