Skip to content

Commit 68ecaee

Browse files
authored
Add experimental send_recording python api (#9148)
### Related * Resolves #2044 ### What Adds api to send a recording loaded from RRD to a new recording stream, cloning the data in the process
1 parent e7fbd0c commit 68ecaee

File tree

11 files changed

+159
-5
lines changed

11 files changed

+159
-5
lines changed

.gitattributes

+1
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ pixi.lock linguist-generated=true
55
# git-lfs stuff
66
**/snapshots/**/*.png filter=lfs diff=lfs merge=lfs -text
77
tests/assets/video/*.mp4 filter=lfs diff=lfs merge=lfs -text
8+
examples/assets/example.rrd filter=lfs diff=lfs merge=lfs -text

crates/top/re_sdk/src/recording_stream.rs

+22-2
Original file line numberDiff line numberDiff line change
@@ -1696,10 +1696,20 @@ impl RecordingStream {
16961696
}
16971697
}
16981698

1699-
/// Records a single [`Chunk`].
1699+
/// Logs multiple [`Chunk`]s.
17001700
///
17011701
/// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1702-
/// for that use [`Self::log_chunk`].
1702+
/// for that use [`Self::log_chunks`].
1703+
pub fn log_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1704+
for chunk in chunks {
1705+
self.log_chunk(chunk);
1706+
}
1707+
}
1708+
1709+
/// Records a single [`Chunk`].
1710+
///
1711+
/// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1712+
/// If you don't want to inject these, use [`Self::send_chunks`] instead.
17031713
#[inline]
17041714
pub fn send_chunk(&self, chunk: Chunk) {
17051715
let f = move |inner: &RecordingStreamInner| {
@@ -1711,6 +1721,16 @@ impl RecordingStream {
17111721
}
17121722
}
17131723

1724+
/// Records multiple [`Chunk`]s.
1725+
///
1726+
/// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1727+
/// for that use [`Self::log_chunks`].
1728+
pub fn send_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1729+
for chunk in chunks {
1730+
self.send_chunk(chunk);
1731+
}
1732+
}
1733+
17141734
/// Swaps the underlying sink for a new one.
17151735
///
17161736
/// This guarantees that:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
"""Send a dataframe to a new recording stream."""
2+
3+
import sys
4+
5+
import rerun as rr
6+
7+
path_to_rrd = sys.argv[1]
8+
9+
# NOTE: This is specifically demonstrating how to send `rr.dataframe.Recording` into the viewer.
10+
# If you just want to view an RRD file, use the simpler `rr.log_file()` function instead:
11+
# rr.log_file("path/to/file.rrd", spawn=True)
12+
13+
recording = rr.dataframe.load_recording(path_to_rrd)
14+
15+
rr.init(recording.application_id(), recording_id=recording.recording_id(), spawn=True)
16+
rr.send_recording(recording)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//! Send a `.rrd` to a new recording stream.
2+
3+
use rerun::external::re_chunk_store::{ChunkStore, ChunkStoreConfig};
4+
use rerun::external::re_log_types::{LogMsg, SetStoreInfo};
5+
use rerun::external::re_tuid::Tuid;
6+
use rerun::VersionPolicy;
7+
8+
fn main() -> Result<(), Box<dyn std::error::Error>> {
9+
// Get the filename from the command-line args.
10+
let filename = std::env::args().nth(2).ok_or("Missing filename argument")?;
11+
12+
// Load the chunk store from the file.
13+
let (store_id, store) =
14+
ChunkStore::from_rrd_filepath(&ChunkStoreConfig::DEFAULT, filename, VersionPolicy::Warn)?
15+
.into_iter()
16+
.next()
17+
.ok_or("Expected exactly one recording in the archive")?;
18+
19+
// Use the same app and recording IDs as the original.
20+
if let Some(info) = store.info().cloned() {
21+
let new_recording = rerun::RecordingStreamBuilder::new(info.application_id.clone())
22+
.recording_id(store_id.to_string())
23+
.spawn()?;
24+
25+
new_recording.record_msg(LogMsg::SetStoreInfo(SetStoreInfo {
26+
row_id: Tuid::new(),
27+
info,
28+
}));
29+
30+
// Forward all chunks to the new recording stream.
31+
new_recording.send_chunks(store.iter_chunks().map(|chunk| (**chunk).clone()));
32+
}
33+
34+
Ok(())
35+
}

docs/snippets/snippets.toml

+4
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ features = [
9999
"cpp",
100100
"rust",
101101
]
102+
"concepts/send_recording" = [ # Not implemented for C++
103+
"cpp",
104+
]
102105
"concepts/static/log_static" = [ # pseudo-code
103106
"py",
104107
"cpp",
@@ -361,6 +364,7 @@ quick_start = [ # These examples don't have exactly the same implementation.
361364

362365
# `$config_dir` will be replaced with the absolute path of `docs/snippets`.
363366
[extra_args]
367+
"concepts/send_recording" = ["$config_dir/../../tests/assets/rrd/dna.rrd"]
364368
"archetypes/asset3d_simple" = ["$config_dir/../../tests/assets/cube.glb"]
365369
"archetypes/asset3d_out_of_tree" = ["$config_dir/../../tests/assets/cube.glb"]
366370
"archetypes/video_auto_frames" = [

examples/assets/example.rrd

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:4467aa40910101da2f308ba2a526a5984ab9512b628257a8dae104818e3f3c03
3-
size 1739
2+
oid sha256:1868150a997a96d4a98dec7da92f5ba313918416c3b3d1b57a205b0890e72c88
3+
size 2250

rerun_py/rerun_bindings/rerun_bindings.pyi

+8
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,14 @@ def send_blueprint(
881881
) -> None:
882882
"""Send a blueprint to the given recording stream."""
883883

884+
def send_recording(rrd: Recording, recording: Optional[PyRecordingStream] = None) -> None:
885+
"""
886+
Send all chunks from a [`PyRecording`] to the given recording stream.
887+
888+
.. warning::
889+
⚠️ This API is experimental and may change or be removed in future versions! ⚠️
890+
"""
891+
884892
#
885893
# misc
886894
#

rerun_py/rerun_sdk/rerun/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@
181181
disconnect as disconnect,
182182
save as save,
183183
send_blueprint as send_blueprint,
184+
send_recording as send_recording,
184185
serve_grpc as serve_grpc,
185186
serve_web as serve_web,
186187
spawn as spawn,

rerun_py/rerun_sdk/rerun/recording_stream.py

+19
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import numpy as np
1414
from typing_extensions import deprecated
1515

16+
import rerun as rr
1617
from rerun import bindings
1718
from rerun.memory import MemoryRecording
1819
from rerun.time import reset_time
@@ -721,6 +722,24 @@ def send_blueprint(
721722

722723
send_blueprint(blueprint=blueprint, make_active=make_active, make_default=make_default, recording=self)
723724

725+
def send_recording(self, recording: rr.dataframe.Recording) -> None:
726+
"""
727+
Send a `Recording` loaded from a `.rrd` to the `RecordingStream`.
728+
729+
.. warning::
730+
⚠️ This API is experimental and may change or be removed in future versions! ⚠️
731+
732+
Parameters
733+
----------
734+
recording:
735+
A `Recording` loaded from a `.rrd`.
736+
737+
"""
738+
739+
from .sinks import send_recording
740+
741+
send_recording(rrd=recording, recording=self)
742+
724743
def spawn(
725744
self,
726745
*,

rerun_py/rerun_sdk/rerun/sinks.py

+30
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import rerun_bindings as bindings
88

99
from rerun.blueprint.api import BlueprintLike, create_in_memory_blueprint
10+
from rerun.dataframe import Recording
11+
from rerun.recording_stream import RecordingStream, get_application_id
1012

1113
from ._spawn import _spawn_viewer
1214

@@ -398,6 +400,34 @@ def send_blueprint(
398400
)
399401

400402

403+
def send_recording(rrd: Recording, recording: RecordingStream | None = None) -> None:
404+
"""
405+
Send a `Recording` loaded from a `.rrd` to the `RecordingStream`.
406+
407+
.. warning::
408+
⚠️ This API is experimental and may change or be removed in future versions! ⚠️
409+
410+
Parameters
411+
----------
412+
rrd:
413+
A recording loaded from a `.rrd` file.
414+
recording:
415+
Specifies the [`rerun.RecordingStream`][] to use.
416+
If left unspecified, defaults to the current active data recording, if there is one.
417+
See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].
418+
419+
"""
420+
application_id = get_application_id(recording=recording) # NOLINT
421+
422+
if application_id is None:
423+
raise ValueError("No application id found. You must call rerun.init before sending a recording.")
424+
425+
bindings.send_recording(
426+
rrd,
427+
recording=recording.to_native() if recording is not None else None,
428+
)
429+
430+
401431
def spawn(
402432
*,
403433
port: int = 9876,

rerun_py/src/python_bridge.rs

+21-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ impl PyRuntimeErrorExt for PyRuntimeError {
4242

4343
use once_cell::sync::{Lazy, OnceCell};
4444

45+
use crate::dataframe::PyRecording;
46+
4547
// The bridge needs to have complete control over the lifetimes of the individual recordings,
4648
// otherwise all the recording shutdown machinery (which includes deallocating C, Rust and Python
4749
// data and joining a bunch of threads) can end up running at any time depending on what the
@@ -172,6 +174,7 @@ fn rerun_bindings(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
172174
m.add_function(wrap_pyfunction!(log_file_from_contents, m)?)?;
173175
m.add_function(wrap_pyfunction!(send_arrow_chunk, m)?)?;
174176
m.add_function(wrap_pyfunction!(send_blueprint, m)?)?;
177+
m.add_function(wrap_pyfunction!(send_recording, m)?)?;
175178

176179
// misc
177180
m.add_function(wrap_pyfunction!(version, m)?)?;
@@ -1341,6 +1344,23 @@ fn send_blueprint(
13411344
}
13421345
}
13431346

1347+
/// Send all chunks from a [`PyRecording`] to the given recording stream.
1348+
///
1349+
/// .. warning::
1350+
/// ⚠️ This API is experimental and may change or be removed in future versions! ⚠️
1351+
#[pyfunction]
1352+
#[pyo3(signature = (rrd, recording = None))]
1353+
fn send_recording(rrd: &PyRecording, recording: Option<&PyRecordingStream>) {
1354+
let Some(recording) = get_data_recording(recording) else {
1355+
return;
1356+
};
1357+
1358+
let store = rrd.store.read();
1359+
for chunk in store.iter_chunks() {
1360+
recording.send_chunk((**chunk).clone());
1361+
}
1362+
}
1363+
13441364
// --- Misc ---
13451365

13461366
/// Return a verbose version string.
@@ -1471,7 +1491,7 @@ fn send_recording_start_time_nanos(
14711491

14721492
// --- Helpers ---
14731493

1474-
fn python_version(py: Python<'_>) -> re_log_types::PythonVersion {
1494+
pub fn python_version(py: Python<'_>) -> re_log_types::PythonVersion {
14751495
let py_version = py.version_info();
14761496
re_log_types::PythonVersion {
14771497
major: py_version.major,

0 commit comments

Comments
 (0)