-
Notifications
You must be signed in to change notification settings - Fork 532
/
Copy pathasync_wrapper.rs
54 lines (43 loc) · 1.45 KB
/
async_wrapper.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
use super::{Event, EventProcessor};
use async_channel::{Receiver, Sender};
pub struct AsyncProcessor<P: EventProcessor> {
sender: Sender<Message<P>>,
}
struct Worker<P: EventProcessor> {
processor: P,
rec: Receiver<Message<P>>,
}
impl<P: EventProcessor + 'static> Worker<P> {
pub fn start(processor: P, rec: Receiver<Message<P>>) {
let mut worker = Self { processor, rec };
std::thread::spawn(move || {
while let Ok(msg) = worker.rec.recv_blocking() {
match msg {
Message::Train(event) => worker.processor.process_train(event),
Message::Valid(event) => worker.processor.process_valid(event),
}
}
});
}
}
impl<P: EventProcessor + 'static> AsyncProcessor<P> {
pub fn new(processor: P) -> Self {
let (sender, rec) = async_channel::bounded(1);
Worker::start(processor, rec);
Self { sender }
}
}
enum Message<P: EventProcessor> {
Train(Event<P::ItemTrain>),
Valid(Event<P::ItemValid>),
}
impl<P: EventProcessor> EventProcessor for AsyncProcessor<P> {
type ItemTrain = P::ItemTrain;
type ItemValid = P::ItemValid;
fn process_train(&mut self, event: Event<Self::ItemTrain>) {
self.sender.send_blocking(Message::Train(event)).unwrap();
}
fn process_valid(&mut self, event: Event<Self::ItemValid>) {
self.sender.send_blocking(Message::Valid(event)).unwrap();
}
}