Skip to content

Commit d90a697

Browse files
authored
Refactor native video decoder for code reuse (#7641)
### What * Part of #7606 This refactors the native decoder so that we can easily slot in a different native decoder (ffmpeg!). The innermost trait is now `SyncDecoder` which gets pushed a chunk, and blocks while producing frames (or errors). Around that is an `AsyncDecoder` that runs the `SyncDecoder` on a background thread to produce a non-blocking interface. Finally in `re_renderer` there is the `NativeDecoder` that wraps `SyncDecoder` and handles texture uploads. There is a lot of code moved, but very little code actually _changed_. It's just another layer of abstraction introduced. ### Checklist * [x] I have read and agree to [Contributor Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and the [Code of Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md) * [x] I've included a screenshot or gif (if applicable) * [x] I have tested the web demo (if applicable): * Using examples from latest `main` build: [rerun.io/viewer](https://rerun.io/viewer/pr/7641?manifest_url=https://app.rerun.io/version/main/examples_manifest.json) * Using full set of examples from `nightly` build: [rerun.io/viewer](https://rerun.io/viewer/pr/7641?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json) * [x] The PR title and labels are set such as to maximize their usefulness for the next release's CHANGELOG * [x] If applicable, add a new check to the [release checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)! * [x] If have noted any breaking changes to the log API in `CHANGELOG.md` and the migration guide - [PR Build Summary](https://build.rerun.io/pr/7641) - [Recent benchmark results](https://build.rerun.io/graphs/crates.html) - [Wasm size tracking](https://build.rerun.io/graphs/sizes.html) To run all checks from `main`, comment on the PR with `@rerun-bot full-check`.
1 parent 46e0b3b commit d90a697

File tree

8 files changed

+246
-199
lines changed

8 files changed

+246
-199
lines changed

crates/store/re_video/examples/frames.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ fn main() {
2727
let video = std::fs::read(video_path).expect("failed to read video");
2828
let video = re_video::VideoData::load_mp4(&video).expect("failed to load video");
2929

30+
let sync_decoder = Box::new(
31+
re_video::decode::av1::SyncDav1dDecoder::new().expect("Failed to start AV1 decoder"),
32+
);
33+
3034
println!(
3135
"{} {}x{}",
3236
video.gops.len(),
@@ -38,14 +42,16 @@ fn main() {
3842
progress.enable_steady_tick(Duration::from_millis(100));
3943

4044
let frames = Arc::new(Mutex::new(Vec::new()));
41-
let mut decoder = re_video::decode::av1::Decoder::new("debug_name".to_owned(), {
45+
let on_output = {
4246
let frames = frames.clone();
4347
let progress = progress.clone();
4448
move |frame| {
4549
progress.inc(1);
4650
frames.lock().push(frame);
4751
}
48-
});
52+
};
53+
let mut decoder =
54+
re_video::decode::AsyncDecoder::new("debug_name".to_owned(), sync_decoder, on_output);
4955

5056
let start = Instant::now();
5157
for sample in &video.samples {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
use std::sync::{
2+
atomic::{AtomicBool, AtomicU64, Ordering},
3+
Arc,
4+
};
5+
6+
use crossbeam::channel::{unbounded, Receiver, Sender};
7+
8+
use super::{Chunk, Frame, OutputCallback, Result, SyncDecoder};
9+
10+
enum Command {
11+
Chunk(Chunk),
12+
Flush { on_done: Sender<()> },
13+
Reset,
14+
Stop,
15+
}
16+
17+
#[derive(Clone)]
18+
struct Comms {
19+
/// Set when it is time to die
20+
should_stop: Arc<AtomicBool>,
21+
22+
/// Incremented on each call to [`AsyncDecoder::reset`].
23+
/// Decremented each time the decoder thread receives [`Command::Reset`].
24+
num_outstanding_resets: Arc<AtomicU64>,
25+
}
26+
27+
impl Default for Comms {
28+
fn default() -> Self {
29+
Self {
30+
should_stop: Arc::new(AtomicBool::new(false)),
31+
num_outstanding_resets: Arc::new(AtomicU64::new(0)),
32+
}
33+
}
34+
}
35+
36+
/// Runs a [`SyncDecoder`] in a background thread, for non-blocking video decoding.
37+
pub struct AsyncDecoder {
38+
/// Where the decoding happens
39+
_thread: std::thread::JoinHandle<()>,
40+
41+
/// Commands sent to the decoder thread.
42+
command_tx: Sender<Command>,
43+
44+
/// Instant communication to the decoder thread (circumventing the command queue).
45+
comms: Comms,
46+
}
47+
48+
impl AsyncDecoder {
49+
pub fn new(
50+
debug_name: String,
51+
mut sync_decoder: Box<dyn SyncDecoder + Send>,
52+
on_output: impl Fn(Result<Frame>) + Send + Sync + 'static,
53+
) -> Self {
54+
re_tracing::profile_function!();
55+
56+
let (command_tx, command_rx) = unbounded();
57+
let comms = Comms::default();
58+
59+
let thread = std::thread::Builder::new()
60+
.name("av1_decoder".into())
61+
.spawn({
62+
let comms = comms.clone();
63+
move || {
64+
econtext::econtext_data!("Video", debug_name.clone());
65+
66+
decoder_thread(sync_decoder.as_mut(), &comms, &command_rx, &on_output);
67+
re_log::debug!("Closing decoder thread for {debug_name}");
68+
}
69+
})
70+
.expect("failed to spawn decoder thread");
71+
72+
Self {
73+
_thread: thread,
74+
command_tx,
75+
comms,
76+
}
77+
}
78+
79+
// NOTE: The interface is all `&mut self` to avoid certain types of races.
80+
pub fn decode(&mut self, chunk: Chunk) {
81+
re_tracing::profile_function!();
82+
self.command_tx.send(Command::Chunk(chunk)).ok();
83+
}
84+
85+
/// Resets the decoder.
86+
///
87+
/// This does not block, all chunks sent to `decode` before this point will be discarded.
88+
// NOTE: The interface is all `&mut self` to avoid certain types of races.
89+
pub fn reset(&mut self) {
90+
re_tracing::profile_function!();
91+
92+
// Increment resets first…
93+
self.comms
94+
.num_outstanding_resets
95+
.fetch_add(1, Ordering::Release);
96+
97+
// …so it is visible on the decoder thread when it gets the `Reset` command.
98+
self.command_tx.send(Command::Reset).ok();
99+
}
100+
101+
/// Blocks until all pending frames have been decoded.
102+
// NOTE: The interface is all `&mut self` to avoid certain types of races.
103+
pub fn flush(&mut self) {
104+
re_tracing::profile_function!();
105+
let (tx, rx) = crossbeam::channel::bounded(0);
106+
self.command_tx.send(Command::Flush { on_done: tx }).ok();
107+
rx.recv().ok();
108+
}
109+
}
110+
111+
impl Drop for AsyncDecoder {
112+
fn drop(&mut self) {
113+
re_tracing::profile_function!();
114+
115+
// Set `should_stop` first…
116+
self.comms.should_stop.store(true, Ordering::Release);
117+
118+
// …so it is visible on the decoder thread when it gets the `Stop` command.
119+
self.command_tx.send(Command::Stop).ok();
120+
121+
// NOTE: we don't block here. The decoder thread will finish soon enough.
122+
}
123+
}
124+
125+
fn decoder_thread(
126+
decoder: &mut dyn SyncDecoder,
127+
comms: &Comms,
128+
command_rx: &Receiver<Command>,
129+
on_output: &OutputCallback,
130+
) {
131+
#![allow(clippy::debug_assert_with_mut_call)]
132+
133+
while let Ok(command) = command_rx.recv() {
134+
if comms.should_stop.load(Ordering::Acquire) {
135+
re_log::debug!("Should stop");
136+
return;
137+
}
138+
139+
// If we're waiting for a reset we should ignore all other commands until we receive it.
140+
let has_outstanding_reset = 0 < comms.num_outstanding_resets.load(Ordering::Acquire);
141+
142+
match command {
143+
Command::Chunk(chunk) => {
144+
if !has_outstanding_reset {
145+
decoder.submit_chunk(&comms.should_stop, chunk, on_output);
146+
}
147+
}
148+
Command::Flush { on_done } => {
149+
on_done.send(()).ok();
150+
}
151+
Command::Reset => {
152+
decoder.reset();
153+
comms.num_outstanding_resets.fetch_sub(1, Ordering::Release);
154+
}
155+
Command::Stop => {
156+
re_log::debug!("Stop");
157+
return;
158+
}
159+
}
160+
}
161+
162+
re_log::debug!("Disconnected");
163+
}

0 commit comments

Comments
 (0)