Skip to content

Commit d0fe589

Browse files
authored
Merge pull request #6 from aturon/mio-reintegration
Mio reintegration
2 parents ce292be + bc60d83 commit d0fe589

File tree

11 files changed

+806
-360
lines changed

11 files changed

+806
-360
lines changed

mio/Cargo.lock

+12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mio/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ authors = ["Alex Crichton <[email protected]>"]
77
mio = { git = "https://github.com/alexcrichton/mio", branch = "tcp-sync" }
88
# mio = "0.5"
99
futures = { path = ".." }
10+
scoped-tls = "0.1.0"
11+
slab = "0.2.0"
1012

1113
[lib]
1214
test = false

mio/src/event_loop.rs

+342
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
use std::cell::{Cell, RefCell};
2+
use std::io::{self, ErrorKind};
3+
use std::sync::Arc;
4+
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
5+
use std::sync::mpsc;
6+
7+
use mio;
8+
use slab::Slab;
9+
use futures::{Future, Tokens, Wake, PollResult};
10+
11+
pub type Waiter = Arc<Wake>;
12+
pub type Source = Arc<mio::Evented + Send + Sync>;
13+
14+
static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
15+
scoped_thread_local!(static CURRENT_LOOP: Loop);
16+
17+
const SLAB_CAPACITY: usize = 1024 * 64;
18+
19+
pub struct Loop {
20+
id: usize,
21+
active: Cell<bool>,
22+
io: RefCell<mio::Poll>,
23+
tx: mio::channel::Sender<Message>,
24+
rx: mio::channel::Receiver<Message>,
25+
dispatch: RefCell<Slab<Scheduled, usize>>,
26+
}
27+
28+
#[derive(Clone)]
29+
pub struct LoopHandle {
30+
id: usize,
31+
tx: mio::channel::Sender<Message>,
32+
}
33+
34+
#[derive(Copy, Clone)]
35+
pub enum Direction {
36+
Read,
37+
Write,
38+
}
39+
40+
struct Scheduled {
41+
source: Source,
42+
reader: Option<Waiter>,
43+
writer: Option<Waiter>,
44+
}
45+
46+
impl Scheduled {
47+
fn waiter_for(&mut self, dir: Direction) -> &mut Option<Waiter> {
48+
match dir {
49+
Direction::Read => &mut self.reader,
50+
Direction::Write => &mut self.writer,
51+
}
52+
}
53+
54+
fn event_set(&self) -> mio::EventSet {
55+
let mut set = mio::EventSet::none();
56+
if self.reader.is_some() {
57+
set = set | mio::EventSet::readable()
58+
}
59+
if self.writer.is_some() {
60+
set = set | mio::EventSet::writable()
61+
}
62+
set
63+
}
64+
}
65+
66+
enum Message {
67+
AddSource(Source, Arc<AtomicUsize>, Waiter),
68+
DropSource(usize),
69+
Schedule(usize, Direction, Waiter),
70+
Deschedule(usize, Direction),
71+
Shutdown,
72+
}
73+
74+
fn register(poll: &mut mio::Poll, token: usize, sched: &Scheduled) {
75+
// TODO: handle error
76+
poll.register(&*sched.source,
77+
mio::Token(token),
78+
mio::EventSet::none(),
79+
mio::PollOpt::level())
80+
.unwrap();
81+
}
82+
83+
fn reregister(poll: &mut mio::Poll, token: usize, sched: &Scheduled) {
84+
// TODO: handle error
85+
poll.reregister(&*sched.source,
86+
mio::Token(token),
87+
sched.event_set(),
88+
mio::PollOpt::edge() | mio::PollOpt::oneshot())
89+
.unwrap();
90+
}
91+
92+
fn deregister(poll: &mut mio::Poll, sched: &Scheduled) {
93+
// TODO: handle error
94+
poll.deregister(&*sched.source).unwrap();
95+
}
96+
97+
impl Loop {
98+
pub fn new() -> io::Result<Loop> {
99+
let (tx, rx) = mio::channel::from_std_channel(mpsc::channel());
100+
let io = try!(mio::Poll::new());
101+
try!(io.register(&rx,
102+
mio::Token(0),
103+
mio::EventSet::readable(),
104+
mio::PollOpt::edge()));
105+
Ok(Loop {
106+
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
107+
active: Cell::new(true),
108+
io: RefCell::new(io),
109+
tx: tx,
110+
rx: rx,
111+
dispatch: RefCell::new(Slab::new_starting_at(1, SLAB_CAPACITY)),
112+
})
113+
}
114+
115+
pub fn handle(&self) -> LoopHandle {
116+
LoopHandle {
117+
id: self.id,
118+
tx: self.tx.clone(),
119+
}
120+
}
121+
122+
pub fn run<F: Future>(self, f: F) -> Result<F::Item, F::Error> {
123+
let (tx_res, rx_res) = mpsc::channel();
124+
let handle = self.handle();
125+
f.then(move |res| {
126+
handle.shutdown();
127+
tx_res.send(res)
128+
}).forget();
129+
130+
while self.active.get() {
131+
let amt;
132+
// On Linux, Poll::poll is epoll_wait, which may return EINTR if a
133+
// ptracer attaches. This retry loop prevents crashing when
134+
// attaching strace, or similar.
135+
loop {
136+
match self.io.borrow_mut().poll(None) {
137+
Ok(a) => {
138+
amt = a;
139+
break;
140+
}
141+
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
142+
err @ Err(_) => {
143+
err.unwrap();
144+
}
145+
}
146+
}
147+
148+
// TODO: coalesce token sets for a given Wake?
149+
for i in 0..amt {
150+
let event = self.io.borrow_mut().events().get(i).unwrap();
151+
let token = event.token().as_usize();
152+
if token == 0 {
153+
self.consume_queue();
154+
} else {
155+
let mut reader = None;
156+
let mut writer = None;
157+
158+
if let Some(sched) = self.dispatch.borrow_mut().get_mut(token) {
159+
if event.kind().is_readable() {
160+
reader = sched.reader.take();
161+
}
162+
163+
if event.kind().is_writable() {
164+
writer = sched.writer.take();
165+
}
166+
}
167+
168+
CURRENT_LOOP.set(&self, || {
169+
if let Some(reader_wake) = reader.take() {
170+
reader_wake.wake(&Tokens::from_usize(token));
171+
}
172+
if let Some(writer_wake) = writer.take() {
173+
writer_wake.wake(&Tokens::from_usize(token));
174+
}
175+
});
176+
177+
// For now, always reregister, to deal with the fact that
178+
// combined oneshot + read|write requires rearming even if
179+
// only one side fired.
180+
//
181+
// TODO: optimize this
182+
if let Some(sched) = self.dispatch.borrow().get(token) {
183+
reregister(&mut self.io.borrow_mut(), token, &sched);
184+
}
185+
}
186+
}
187+
}
188+
189+
rx_res.recv().unwrap()
190+
}
191+
192+
fn add_source(&self, source: Source) -> usize {
193+
let sched = Scheduled {
194+
source: source,
195+
reader: None,
196+
writer: None,
197+
};
198+
let mut dispatch = self.dispatch.borrow_mut();
199+
// TODO: handle out of space
200+
let entry = dispatch.vacant_entry().unwrap();
201+
register(&mut self.io.borrow_mut(), entry.index(), &sched);
202+
entry.insert(sched).index()
203+
}
204+
205+
fn drop_source(&self, token: usize) {
206+
let sched = self.dispatch.borrow_mut().remove(token).unwrap();
207+
deregister(&mut self.io.borrow_mut(), &sched);
208+
}
209+
210+
fn schedule(&self, token: usize, dir: Direction, wake: Waiter) {
211+
let mut dispatch = self.dispatch.borrow_mut();
212+
let sched = dispatch.get_mut(token).unwrap();
213+
*sched.waiter_for(dir) = Some(wake);
214+
reregister(&mut self.io.borrow_mut(), token, sched);
215+
}
216+
217+
fn deschedule(&self, token: usize, dir: Direction) {
218+
let mut dispatch = self.dispatch.borrow_mut();
219+
let sched = dispatch.get_mut(token).unwrap();
220+
*sched.waiter_for(dir) = None;
221+
reregister(&mut self.io.borrow_mut(), token, sched);
222+
}
223+
224+
fn consume_queue(&self) {
225+
while let Ok(msg) = self.rx.try_recv() {
226+
self.notify(msg);
227+
}
228+
}
229+
230+
fn notify(&self, msg: Message) {
231+
match msg {
232+
Message::AddSource(source, id, wake) => {
233+
let tok = self.add_source(source);
234+
id.store(tok, Ordering::Relaxed);
235+
wake.wake(&Tokens::from_usize(ADD_SOURCE_TOKEN));
236+
}
237+
Message::DropSource(tok) => self.drop_source(tok),
238+
Message::Schedule(tok, dir, wake) => self.schedule(tok, dir, wake),
239+
Message::Deschedule(tok, dir) => self.deschedule(tok, dir),
240+
Message::Shutdown => self.active.set(false),
241+
}
242+
}
243+
}
244+
245+
impl LoopHandle {
246+
fn send(&self, msg: Message) {
247+
let mut msg_dance = Some(msg);
248+
249+
if CURRENT_LOOP.is_set() {
250+
CURRENT_LOOP.with(|lp| {
251+
if lp.id == self.id {
252+
// Need to execute all existing requests first, to ensure
253+
// that our message is processed "in order"
254+
lp.consume_queue();
255+
lp.notify(msg_dance.take().unwrap());
256+
}
257+
})
258+
}
259+
260+
if let Some(msg) = msg_dance.take() {
261+
self.tx
262+
.send(msg)
263+
.map_err(|_| ())
264+
.expect("failed to send register message") // todo: handle failure
265+
}
266+
}
267+
268+
pub fn add_source(&self, source: Source) -> AddSource {
269+
AddSource {
270+
loop_handle: self.clone(),
271+
source: Some(source),
272+
id: Arc::new(AtomicUsize::new(0)),
273+
scheduled: false,
274+
}
275+
}
276+
277+
fn add_source_(&self, source: Source, id: Arc<AtomicUsize>, wake: Waiter) {
278+
self.send(Message::AddSource(source, id, wake));
279+
}
280+
281+
pub fn drop_source(&self, tok: usize) {
282+
self.send(Message::DropSource(tok));
283+
}
284+
285+
pub fn schedule(&self, tok: usize, dir: Direction, wake: Waiter) {
286+
self.send(Message::Schedule(tok, dir, wake));
287+
}
288+
289+
pub fn deschedule(&self, tok: usize, dir: Direction) {
290+
self.send(Message::Deschedule(tok, dir));
291+
}
292+
293+
pub fn shutdown(&self) {
294+
self.send(Message::Shutdown);
295+
}
296+
}
297+
298+
const ADD_SOURCE_TOKEN: usize = 0;
299+
300+
pub struct AddSource {
301+
loop_handle: LoopHandle,
302+
source: Option<Source>,
303+
id: Arc<AtomicUsize>,
304+
scheduled: bool,
305+
}
306+
307+
impl Future for AddSource {
308+
type Item = usize;
309+
type Error = io::Error; // TODO: integrate channel error?
310+
311+
fn poll(&mut self, tokens: &Tokens) -> Option<PollResult<usize, io::Error>> {
312+
if self.scheduled {
313+
if tokens.may_contain(&Tokens::from_usize(ADD_SOURCE_TOKEN)) {
314+
let id = self.id.load(Ordering::Relaxed);
315+
if id != 0 {
316+
return Some(Ok(id))
317+
}
318+
}
319+
} else {
320+
if CURRENT_LOOP.is_set() {
321+
let res = CURRENT_LOOP.with(|lp| {
322+
if lp.id == self.loop_handle.id {
323+
Some(lp.add_source(self.source.take().unwrap()))
324+
} else {
325+
None
326+
}
327+
});
328+
if let Some(id) = res {
329+
return Some(Ok(id));
330+
}
331+
}
332+
}
333+
334+
None
335+
}
336+
337+
fn schedule(&mut self, wake: Arc<Wake>) {
338+
if self.scheduled { return; }
339+
self.scheduled = true;
340+
self.loop_handle.add_source_(self.source.take().unwrap(), self.id.clone(), wake);
341+
}
342+
}

0 commit comments

Comments
 (0)