Skip to content

Migrate file format to protobuf #8995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6250,7 +6250,6 @@ dependencies = [
"re_smart_channel",
"re_tracing",
"re_types",
"rmp-serde",
"serde_test",
"similar-asserts",
"thiserror 1.0.65",
Expand Down Expand Up @@ -7635,28 +7634,6 @@ dependencies = [
"windows-sys 0.52.0",
]

[[package]]
name = "rmp"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4"
dependencies = [
"byteorder",
"num-traits",
"paste",
]

[[package]]
name = "rmp-serde"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db"
dependencies = [
"byteorder",
"rmp",
"serde",
]

[[package]]
name = "ron"
version = "0.8.1"
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ rfd = { version = "0.15", default-features = false, features = [
"async-std",
"xdg-portal",
] }
rmp-serde = "1"
ron = "0.8.0"
roxmltree = "0.19.0"
rust-format = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ mod tests {

let mut encoder = DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
re_log_encoding::EncodingOptions::MSGPACK_UNCOMPRESSED,
re_log_encoding::EncodingOptions::PROTOBUF_UNCOMPRESSED,
rrd_file,
)
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_entity_db/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn log_messages() {

fn encode_log_msg(log_msg: &LogMsg) -> Vec<u8> {
let mut bytes = vec![];
let encoding_options = re_log_encoding::EncodingOptions::MSGPACK_COMPRESSED;
let encoding_options = re_log_encoding::EncodingOptions::PROTOBUF_COMPRESSED;
re_log_encoding::encoder::encode_ref(
re_build_info::CrateVersion::LOCAL,
encoding_options,
Expand Down
4 changes: 1 addition & 3 deletions crates/store/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ decoder = [
"re_log_types/serde",
"dep:bytes",
"dep:lz4_flex",
"dep:rmp-serde",
"dep:tokio",
"dep:tokio-stream",
]

## Enable encoding of log messages to an .rrd file/stream.
encoder = ["dep:rmp-serde", "dep:lz4_flex", "re_log_types/serde"]
encoder = ["dep:lz4_flex", "re_log_types/serde"]

## Enable streaming of .rrd files from HTTP.
stream_from_http = [
Expand Down Expand Up @@ -67,7 +66,6 @@ thiserror.workspace = true
bytes = { workspace = true, optional = true }
ehttp = { workspace = true, optional = true, features = ["streaming"] }
lz4_flex = { workspace = true, optional = true }
rmp-serde = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["io-util"] }
tokio-stream = { workspace = true, optional = true }
web-time = { workspace = true, optional = true }
Expand Down
91 changes: 0 additions & 91 deletions crates/store/re_log_encoding/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use re_log_types::{
};

use re_log_encoding::EncodingOptions;
const MSGPACK_COMPRESSED: EncodingOptions = EncodingOptions::MSGPACK_COMPRESSED;
const PROTOBUF_COMPRESSED: EncodingOptions = EncodingOptions::PROTOBUF_COMPRESSED;

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -112,20 +111,9 @@ fn mono_points_arrow(c: &mut Criterion) {
b.iter(|| generate_messages(&store_id, &chunks));
});
let messages = generate_messages(&store_id, &chunks);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages, MSGPACK_COMPRESSED));
});
group.bench_function("encode_log_msg(protobuf)", |b| {
b.iter(|| encode_log_msgs(&messages, PROTOBUF_COMPRESSED));
});
group.bench_function("encode_total", |b| {
b.iter(|| {
encode_log_msgs(
&generate_messages(&store_id, &generate_chunks()),
MSGPACK_COMPRESSED,
)
});
});
group.bench_function("encode_total(protobuf)", |b| {
b.iter(|| {
encode_log_msgs(
Expand All @@ -136,25 +124,6 @@ fn mono_points_arrow(c: &mut Criterion) {
});

{
let encoded = encode_log_msgs(&messages, MSGPACK_COMPRESSED);
group.bench_function("decode_log_msg", |b| {
b.iter(|| {
let decoded = decode_log_msgs(&encoded);
assert_eq!(decoded.len(), messages.len());
decoded
});
});
group.bench_function("decode_message_bundles", |b| {
b.iter(|| {
let chunks = decode_chunks(&messages);
assert_eq!(chunks.len(), messages.len());
chunks
});
});
group.bench_function("decode_total", |b| {
b.iter(|| decode_chunks(&decode_log_msgs(&encoded)));
});

let encoded = encode_log_msgs(&messages, PROTOBUF_COMPRESSED);
group.bench_function("decode_log_msg(protobuf)", |b| {
b.iter(|| {
Expand Down Expand Up @@ -205,20 +174,9 @@ fn mono_points_arrow_batched(c: &mut Criterion) {
b.iter(|| generate_messages(&store_id, &chunks));
});
let messages = generate_messages(&store_id, &chunks);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages, MSGPACK_COMPRESSED));
});
group.bench_function("encode_log_msg(protobuf)", |b| {
b.iter(|| encode_log_msgs(&messages, PROTOBUF_COMPRESSED));
});
group.bench_function("encode_total", |b| {
b.iter(|| {
encode_log_msgs(
&generate_messages(&store_id, &[generate_chunk()]),
MSGPACK_COMPRESSED,
)
});
});
group.bench_function("encode_total(protobuf)", |b| {
b.iter(|| {
encode_log_msgs(
Expand All @@ -229,25 +187,6 @@ fn mono_points_arrow_batched(c: &mut Criterion) {
});

{
let encoded = encode_log_msgs(&messages, MSGPACK_COMPRESSED);
group.bench_function("decode_log_msg", |b| {
b.iter(|| {
let decoded = decode_log_msgs(&encoded);
assert_eq!(decoded.len(), messages.len());
decoded
});
});
group.bench_function("decode_message_bundles", |b| {
b.iter(|| {
let bundles = decode_chunks(&messages);
assert_eq!(bundles.len(), messages.len());
bundles
});
});
group.bench_function("decode_total", |b| {
b.iter(|| decode_chunks(&decode_log_msgs(&encoded)));
});

let encoded = encode_log_msgs(&messages, PROTOBUF_COMPRESSED);
group.bench_function("decode_log_msg(protobuf)", |b| {
b.iter(|| {
Expand Down Expand Up @@ -297,20 +236,9 @@ fn batch_points_arrow(c: &mut Criterion) {
b.iter(|| generate_messages(&store_id, &chunks));
});
let messages = generate_messages(&store_id, &chunks);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages, MSGPACK_COMPRESSED));
});
group.bench_function("encode_log_msg(protobuf)", |b| {
b.iter(|| encode_log_msgs(&messages, PROTOBUF_COMPRESSED));
});
group.bench_function("encode_total", |b| {
b.iter(|| {
encode_log_msgs(
&generate_messages(&store_id, &generate_chunks()),
MSGPACK_COMPRESSED,
)
});
});
group.bench_function("encode_total(protobuf)", |b| {
b.iter(|| {
encode_log_msgs(
Expand All @@ -321,25 +249,6 @@ fn batch_points_arrow(c: &mut Criterion) {
});

{
let encoded = encode_log_msgs(&messages, MSGPACK_COMPRESSED);
group.bench_function("decode_log_msg", |b| {
b.iter(|| {
let decoded = decode_log_msgs(&encoded);
assert_eq!(decoded.len(), messages.len());
decoded
});
});
group.bench_function("decode_message_bundles", |b| {
b.iter(|| {
let chunks = decode_chunks(&messages);
assert_eq!(chunks.len(), messages.len());
chunks
});
});
group.bench_function("decode_total", |b| {
b.iter(|| decode_chunks(&decode_log_msgs(&encoded)));
});

let encoded = encode_log_msgs(&messages, PROTOBUF_COMPRESSED);
group.bench_function("decode_log_msg(protobuf)", |b| {
b.iter(|| {
Expand Down
3 changes: 3 additions & 0 deletions crates/store/re_log_encoding/src/codec/file/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option<LogMs
Ok((read_bytes, msg))
}

/// Decode a message of kind `message_kind` from `buf`.
///
/// `Ok(None)` returned from this function marks the end of the file stream.
pub fn decode_bytes(message_kind: MessageKind, buf: &[u8]) -> Result<Option<LogMsg>, DecodeError> {
use re_protos::external::prost::Message;
use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo};
Expand Down
13 changes: 11 additions & 2 deletions crates/store/re_log_encoding/src/codec/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ pub(crate) struct MessageHeader {
}

impl MessageHeader {
/// Size of an encoded message header, in bytes.
// NOTE: This is `size_of` on a `repr(Rust)` struct,
// which is fine because we control the layout
// in the definition above, and tests would quickly
// catch any sort of regression.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not switching to the C ABI if we need control over layout though?

Copy link
Member Author

@jprochazk jprochazk Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this a bit more, and maybe this should just be a literal constant. We need control over the encoding, not the layout of the struct.
Alternatively we can control the layout of the struct, and use something like bytemuck to cast to/from a slice of bytes. But that's an extra dependency that may not be worth it for something so simple

pub const SIZE_BYTES: usize = std::mem::size_of::<Self>();

// NOTE: We use little-endian encoding, because we live in
// the 21st century.
Comment on lines +36 to +37
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this just after reviewing an arrow PR that encodes stuff as BE to be spec compliant lol

Copy link
Member Author

@jprochazk jprochazk Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is our format, and it doesn't go over the network so nobody can claim it should be network-endian! 😄

...well it technically does if you download it over http. But we must resist the CPU cycles wasted on swapping bytes.

#[cfg(feature = "encoder")]
pub(crate) fn encode(
&self,
Expand All @@ -45,7 +54,7 @@ impl MessageHeader {
pub(crate) fn decode(
data: &mut impl std::io::Read,
) -> Result<Self, crate::decoder::DecodeError> {
let mut buf = [0; std::mem::size_of::<Self>()];
let mut buf = [0; Self::SIZE_BYTES];
data.read_exact(&mut buf)?;

Self::from_bytes(&buf)
Expand All @@ -55,7 +64,7 @@ impl MessageHeader {
/// TODO(zehiko) this should be public, we need to shuffle things around to ensure that #8726
#[cfg(feature = "decoder")]
pub fn from_bytes(buf: &[u8]) -> Result<Self, crate::decoder::DecodeError> {
if buf.len() != 16 {
if buf.len() != Self::SIZE_BYTES {
return Err(crate::decoder::DecodeError::Codec(
crate::codec::CodecError::HeaderDecoding(std::io::Error::new(
std::io::ErrorKind::InvalidData,
Expand Down
Loading
Loading