Skip to content

Commit 0b92f80

Browse files
authored
rt: rm internal Unpark types for Handle types (#5027)
This patch removes all internal `Unpark` runtime types. The runtime now uses the `Handle` types (`runtime::io::Handle` and `runtime::time::Handle`) to signal threads to unpark. Without separate `Unpark` types, future patches will be able to remove more `Arc`s used inside various drivers.
1 parent cba5c10 commit 0b92f80

File tree

11 files changed

+94
-152
lines changed

11 files changed

+94
-152
lines changed

tokio/src/process/unix/driver.rs

-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
//! Process driver.
44
55
use crate::process::unix::GlobalOrphanQueue;
6-
use crate::runtime::io::Handle;
76
use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
87

98
use std::time::Duration;
@@ -28,10 +27,6 @@ impl Driver {
2827
}
2928
}
3029

31-
pub(crate) fn unpark(&self) -> Handle {
32-
self.park.unpark()
33-
}
34-
3530
pub(crate) fn park(&mut self) {
3631
self.park.park();
3732
GlobalOrphanQueue::reap_orphans(&self.signal_handle);

tokio/src/runtime/driver.rs

+28-45
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ pub(crate) struct Cfg {
3838
pub(crate) start_paused: bool,
3939
}
4040

41-
pub(crate) type Unpark = TimerUnpark;
42-
4341
impl Driver {
4442
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
4543
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
@@ -60,10 +58,6 @@ impl Driver {
6058
))
6159
}
6260

63-
pub(crate) fn unpark(&self) -> TimerUnpark {
64-
self.inner.unpark()
65-
}
66-
6761
pub(crate) fn park(&mut self) {
6862
self.inner.park()
6963
}
@@ -77,11 +71,21 @@ impl Driver {
7771
}
7872
}
7973

74+
impl Handle {
75+
pub(crate) fn unpark(&self) {
76+
#[cfg(feature = "time")]
77+
if let Some(handle) = &self.time {
78+
handle.unpark();
79+
}
80+
81+
self.io.unpark();
82+
}
83+
}
84+
8085
// ===== io driver =====
8186

8287
cfg_io_driver! {
8388
pub(crate) type IoDriver = crate::runtime::io::Driver;
84-
pub(crate) type IoHandle = IoUnpark;
8589

8690
#[derive(Debug)]
8791
pub(crate) enum IoStack {
@@ -90,7 +94,7 @@ cfg_io_driver! {
9094
}
9195

9296
#[derive(Debug, Clone)]
93-
pub(crate) enum IoUnpark {
97+
pub(crate) enum IoHandle {
9498
Enabled(crate::runtime::io::Handle),
9599
Disabled(UnparkThread),
96100
}
@@ -106,23 +110,25 @@ cfg_io_driver! {
106110
let (signal_driver, signal_handle) = create_signal_driver(io_driver)?;
107111
let process_driver = create_process_driver(signal_driver);
108112

109-
(IoStack::Enabled(process_driver), IoUnpark::Enabled(io_handle), signal_handle)
113+
(IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle)
110114
} else {
111115
let park_thread = ParkThread::new();
112116
let unpark_thread = park_thread.unpark();
113-
(IoStack::Disabled(park_thread), IoUnpark::Disabled(unpark_thread), Default::default())
117+
(IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), Default::default())
114118
};
115119

116120
Ok(ret)
117121
}
118122

119123
impl IoStack {
120-
pub(crate) fn unpark(&self) -> IoUnpark {
124+
/*
125+
pub(crate) fn handle(&self) -> IoHandle {
121126
match self {
122-
IoStack::Enabled(v) => IoUnpark::Enabled(v.unpark()),
123-
IoStack::Disabled(v) => IoUnpark::Disabled(v.unpark()),
127+
IoStack::Enabled(v) => IoHandle::Enabled(v.handle()),
128+
IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()),
124129
}
125-
}
130+
}]
131+
*/
126132

127133
pub(crate) fn park(&mut self) {
128134
match self {
@@ -146,37 +152,36 @@ cfg_io_driver! {
146152
}
147153
}
148154

149-
impl IoUnpark {
155+
impl IoHandle {
150156
pub(crate) fn unpark(&self) {
151157
match self {
152-
IoUnpark::Enabled(v) => v.unpark(),
153-
IoUnpark::Disabled(v) => v.unpark(),
158+
IoHandle::Enabled(handle) => handle.unpark(),
159+
IoHandle::Disabled(handle) => handle.unpark(),
154160
}
155161
}
156162

157163
#[track_caller]
158164
pub(crate) fn expect(self, msg: &'static str) -> crate::runtime::io::Handle {
159165
match self {
160-
IoUnpark::Enabled(v) => v,
161-
IoUnpark::Disabled(..) => panic!("{}", msg),
166+
IoHandle::Enabled(v) => v,
167+
IoHandle::Disabled(..) => panic!("{}", msg),
162168
}
163169
}
164170

165171
cfg_unstable! {
166172
pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
167173
match self {
168-
IoUnpark::Enabled(v) => Some(v),
169-
IoUnpark::Disabled(..) => None,
174+
IoHandle::Enabled(v) => Some(v),
175+
IoHandle::Disabled(..) => None,
170176
}
171177
}
172178
}
173179
}
174180
}
175181

176182
cfg_not_io_driver! {
177-
pub(crate) type IoHandle = IoUnpark;
183+
pub(crate) type IoHandle = UnparkThread;
178184
pub(crate) type IoStack = ParkThread;
179-
pub(crate) type IoUnpark = UnparkThread;
180185

181186
fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
182187
let park_thread = ParkThread::new();
@@ -249,11 +254,6 @@ cfg_time! {
249254
Disabled(IoStack),
250255
}
251256

252-
pub(crate) enum TimerUnpark {
253-
Enabled(crate::runtime::time::TimerUnpark),
254-
Disabled(IoUnpark),
255-
}
256-
257257
pub(crate) type Clock = crate::time::Clock;
258258
pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>;
259259

@@ -276,13 +276,6 @@ cfg_time! {
276276
}
277277

278278
impl TimeDriver {
279-
pub(crate) fn unpark(&self) -> TimerUnpark {
280-
match self {
281-
TimeDriver::Enabled { driver, .. } => TimerUnpark::Enabled(driver.unpark()),
282-
TimeDriver::Disabled(v) => TimerUnpark::Disabled(v.unpark()),
283-
}
284-
}
285-
286279
pub(crate) fn park(&mut self) {
287280
match self {
288281
TimeDriver::Enabled { driver, handle } => driver.park(handle),
@@ -304,20 +297,10 @@ cfg_time! {
304297
}
305298
}
306299
}
307-
308-
impl TimerUnpark {
309-
pub(crate) fn unpark(&self) {
310-
match self {
311-
TimerUnpark::Enabled(v) => v.unpark(),
312-
TimerUnpark::Disabled(v) => v.unpark(),
313-
}
314-
}
315-
}
316300
}
317301

318302
cfg_not_time! {
319303
type TimeDriver = IoStack;
320-
type TimerUnpark = IoUnpark;
321304

322305
pub(crate) type Clock = ();
323306
pub(crate) type TimeHandle = ();

tokio/src/runtime/io/mod.rs

-5
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,6 @@ impl Driver {
144144
}
145145
}
146146

147-
// TODO: remove this in a later refactor
148-
pub(crate) fn unpark(&self) -> Handle {
149-
self.handle()
150-
}
151-
152147
pub(crate) fn park(&mut self) {
153148
self.turn(None);
154149
}

tokio/src/runtime/scheduler/current_thread.rs

+3-9
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::future::poll_fn;
22
use crate::loom::sync::atomic::AtomicBool;
33
use crate::loom::sync::{Arc, Mutex};
44
use crate::runtime::context::EnterGuard;
5-
use crate::runtime::driver::{self, Driver, Unpark};
5+
use crate::runtime::driver::{self, Driver};
66
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
77
use crate::runtime::{blocking, Config};
88
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
@@ -82,9 +82,6 @@ struct Shared {
8282
/// Collection of all active tasks spawned onto this executor.
8383
owned: OwnedTasks<Arc<Handle>>,
8484

85-
/// Unpark the blocked thread.
86-
unpark: Unpark,
87-
8885
/// Indicates whether the blocked on thread was woken.
8986
woken: AtomicBool,
9087

@@ -122,13 +119,10 @@ impl CurrentThread {
122119
seed_generator: RngSeedGenerator,
123120
config: Config,
124121
) -> CurrentThread {
125-
let unpark = driver.unpark();
126-
127122
let handle = Arc::new(Handle {
128123
shared: Shared {
129124
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
130125
owned: OwnedTasks::new(),
131-
unpark,
132126
woken: AtomicBool::new(false),
133127
config,
134128
scheduler_metrics: SchedulerMetrics::new(),
@@ -467,7 +461,7 @@ impl Schedule for Arc<Handle> {
467461
if let Some(queue) = guard.as_mut() {
468462
queue.push_back(task);
469463
drop(guard);
470-
self.shared.unpark.unpark();
464+
self.driver.unpark();
471465
}
472466
}
473467
});
@@ -511,7 +505,7 @@ impl Wake for Handle {
511505
/// Wake by reference
512506
fn wake_by_ref(arc_self: &Arc<Self>) {
513507
arc_self.shared.woken.store(true, Release);
514-
arc_self.shared.unpark.unpark();
508+
arc_self.driver.unpark();
515509
}
516510
}
517511

tokio/src/runtime/scheduler/multi_thread/handle.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl Handle {
3535
}
3636

3737
pub(crate) fn shutdown(&self) {
38-
self.shared.close();
38+
self.close();
3939
}
4040

4141
pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
@@ -46,7 +46,7 @@ impl Handle {
4646
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
4747

4848
if let Some(notified) = notified {
49-
me.shared.schedule(notified, false);
49+
me.schedule_task(notified, false);
5050
}
5151

5252
handle

tokio/src/runtime/scheduler/multi_thread/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,11 @@ impl MultiThread {
3939
seed_generator: RngSeedGenerator,
4040
config: Config,
4141
) -> (MultiThread, Launch) {
42-
let driver_unpark = driver.unpark();
4342
let parker = Parker::new(driver);
4443
let (handle, launch) = worker::create(
4544
size,
4645
parker,
4746
driver_handle,
48-
driver_unpark,
4947
blocking_spawner,
5048
seed_generator,
5149
config,

tokio/src/runtime/scheduler/multi_thread/park.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl Clone for Parker {
9696
}
9797

9898
impl Unparker {
99-
pub(crate) fn unpark(&self, driver: &driver::Unpark) {
99+
pub(crate) fn unpark(&self, driver: &driver::Handle) {
100100
self.inner.unpark(driver);
101101
}
102102
}
@@ -195,7 +195,7 @@ impl Inner {
195195
}
196196
}
197197

198-
fn unpark(&self, driver: &driver::Unpark) {
198+
fn unpark(&self, driver: &driver::Handle) {
199199
// To ensure the unparked thread will observe any writes we made before
200200
// this call, we must perform a release operation that `park` can
201201
// synchronize with. To do that we must write `NOTIFIED` even if `state`

0 commit comments

Comments
 (0)