From aa6a5de98ed7721e83f26315f3bb6799aa7f748f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 09:41:09 +0200 Subject: [PATCH 01/19] Initial changes to viewer and `re_grpc_server` Pulled out from #9456. --- Cargo.lock | 4 + crates/store/re_grpc_server/Cargo.toml | 1 + crates/store/re_grpc_server/src/lib.rs | 279 +++++++++++++++--- crates/store/re_log_types/src/lib.rs | 106 ++++++- .../proto/rerun/v1alpha1/common.proto | 5 + .../proto/rerun/v1alpha1/sdk_comms.proto | 23 ++ crates/store/re_protos/src/lib.rs | 21 ++ .../src/v1alpha1/rerun.common.v1alpha1.ext.rs | 18 +- .../src/v1alpha1/rerun.common.v1alpha1.rs | 16 + .../src/v1alpha1/rerun.sdk_comms.v1alpha1.rs | 196 ++++++++++++ crates/top/rerun/Cargo.toml | 1 + crates/top/rerun/src/commands/entrypoint.rs | 45 +-- crates/viewer/re_viewer/Cargo.toml | 2 + crates/viewer/re_viewer/src/app.rs | 93 ++++-- crates/viewer/re_viewer/src/lib.rs | 25 ++ crates/viewer/re_viewer/src/native.rs | 2 +- crates/viewer/re_viewer/src/web.rs | 60 +++- .../viewer/re_viewer_context/src/store_hub.rs | 6 + examples/rust/custom_callback/src/viewer.rs | 5 +- examples/rust/custom_view/src/main.rs | 4 +- examples/rust/extend_viewer_ui/src/main.rs | 4 +- examples/rust/viewer_callbacks/src/main.rs | 4 +- rerun_js/web-viewer/index.ts | 25 +- rerun_notebook/src/js/widget.ts | 4 + rerun_notebook/src/rerun_notebook/__init__.py | 13 + rerun_py/rerun_sdk/rerun/notebook.py | 26 +- 26 files changed, 890 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a450c3bae8a0..d42a393e7fb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7144,6 +7144,7 @@ dependencies = [ name = "re_grpc_server" version = "0.23.0-alpha.1+dev" dependencies = [ + "crossbeam", "parking_lot", "re_build_info", "re_byte_size", @@ -8094,8 +8095,10 @@ version = "0.23.0-alpha.1+dev" dependencies = [ "ahash", "anyhow", + "arrow", "bytemuck", "cfg-if", + "crossbeam", "eframe", "egui", "egui-wgpu", @@ -8507,6 +8510,7 @@ dependencies = [ "anyhow", "arrow", "clap", + "crossbeam", "document-features", "env_filter", "itertools 0.14.0", diff --git a/crates/store/re_grpc_server/Cargo.toml b/crates/store/re_grpc_server/Cargo.toml index b24f20cb4fb4..254d38c03bb0 100644 --- a/crates/store/re_grpc_server/Cargo.toml +++ b/crates/store/re_grpc_server/Cargo.toml @@ -34,6 +34,7 @@ re_tracing.workspace = true re_types.workspace = true # External +crossbeam.workspace = true parking_lot.workspace = true tonic = { workspace = true, default-features = false, features = ["transport"] } tonic-web.workspace = true diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 069d51ed7ac6..7fd19643c9c2 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -6,7 +6,13 @@ use std::collections::VecDeque; use std::net::SocketAddr; use std::pin::Pin; +use re_log_encoding::codec::wire::decoder::Decode as _; +use re_log_types::TableMsg; +use re_protos::sdk_comms::v1alpha1::ReadTablesRequest; +use re_protos::sdk_comms::v1alpha1::ReadTablesResponse; use re_protos::sdk_comms::v1alpha1::WriteMessagesRequest; +use re_protos::sdk_comms::v1alpha1::WriteTableRequest; +use re_protos::sdk_comms::v1alpha1::WriteTableResponse; use tokio::net::TcpListener; use tokio::sync::broadcast; use tokio::sync::mpsc; @@ -21,7 +27,9 @@ use tower_http::cors::CorsLayer; use re_byte_size::SizeBytes as _; use re_memory::MemoryLimit; use re_protos::{ - common::v1alpha1::StoreKind as StoreKindProto, + common::v1alpha1::{ + DataframePart as DataframePartProto, StoreKind as StoreKindProto, TableId as TableIdProto, + }, log_msg::v1alpha1::LogMsg as LogMsgProto, sdk_comms::v1alpha1::{ message_proxy_service_server, ReadMessagesRequest, ReadMessagesResponse, @@ -247,8 +255,11 @@ pub fn spawn_with_recv( addr: SocketAddr, memory_limit: MemoryLimit, shutdown: shutdown::Shutdown, -) -> re_smart_channel::Receiver { - let (channel_tx, channel_rx) = re_smart_channel::smart_channel( +) -> ( + re_smart_channel::Receiver, + crossbeam::channel::Receiver, +) { + let (channel_log_tx, channel_log_rx) = re_smart_channel::smart_channel( re_smart_channel::SmartMessageSource::MessageProxy { url: format!("rerun+http://{addr}/proxy"), }, @@ -256,7 +267,9 @@ pub fn spawn_with_recv( url: format!("rerun+http://{addr}/proxy"), }, ); - let (message_proxy, mut broadcast_rx) = MessageProxy::new_with_recv(memory_limit); + let (channel_table_tx, channel_table_rx) = crossbeam::channel::unbounded(); + let (message_proxy, mut broadcast_log_rx, mut broadcast_table_rx) = + MessageProxy::new_with_recv(memory_limit); tokio::spawn(async move { if let Err(err) = serve_impl(addr, message_proxy, shutdown).await { re_log::error!("message proxy server crashed: {err}"); @@ -264,11 +277,11 @@ pub fn spawn_with_recv( }); tokio::spawn(async move { loop { - let msg = match broadcast_rx.recv().await { + let msg = match broadcast_log_rx.recv().await { Ok(msg) => re_log_encoding::protobuf_conversions::log_msg_from_proto(msg), Err(broadcast::error::RecvError::Closed) => { re_log::debug!("message proxy server shut down, closing receiver"); - channel_tx.quit(None).ok(); + channel_log_tx.quit(None).ok(); break; } Err(broadcast::error::RecvError::Lagged(n)) => { @@ -280,7 +293,7 @@ pub fn spawn_with_recv( }; match msg { Ok(msg) => { - if channel_tx.send(msg).is_err() { + if channel_log_tx.send(msg).is_err() { re_log::debug!( "message proxy smart channel receiver closed, closing sender" ); @@ -294,15 +307,92 @@ pub fn spawn_with_recv( } } }); - channel_rx + tokio::spawn(async move { + loop { + let msg = match broadcast_table_rx.recv().await { + Ok(msg) => msg.data.decode().map(|data| TableMsg { + id: msg.id.into(), + data, + }), + Err(broadcast::error::RecvError::Closed) => { + re_log::debug!("message proxy server shut down, closing receiver"); + // We don't have to explicitly call `quit` on crossbeam channels. + break; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + re_log::debug!( + "message proxy receiver dropped {n} messages due to backpressure" + ); + continue; + } + }; + match msg { + Ok(msg) => { + if channel_table_tx.send(msg).is_err() { + re_log::debug!( + "message proxy smart channel receiver closed, closing sender" + ); + break; + } + } + Err(err) => { + re_log::error!("dropping table due to failed decode: {err}"); + continue; + } + } + } + }); + (channel_log_rx, channel_table_rx) } enum Event { /// New client connected, requesting full history and subscribing to new messages. - NewClient(oneshot::Sender<(Vec, broadcast::Receiver)>), + NewClient( + oneshot::Sender<( + Vec, + broadcast::Receiver, + broadcast::Receiver, + )>, + ), /// A client sent a message. Message(LogMsgProto), + + /// A client sent a table. + Table(TableMsgProto), +} + +#[derive(Clone)] +struct TableMsgProto { + id: TableIdProto, + data: DataframePartProto, +} + +#[derive(Clone)] +enum Msg { + LogMsg(LogMsgProto), + Table(TableMsgProto), +} + +impl Msg { + fn total_size_bytes(&self) -> u64 { + match self { + Self::LogMsg(log_msg) => message_size(log_msg), + Self::Table(table) => table_size(table), + } + } +} + +impl From for Msg { + fn from(value: LogMsgProto) -> Self { + Self::LogMsg(value) + } +} + +impl From for Msg { + fn from(value: TableMsgProto) -> Self { + Self::Table(value) + } } /// Main event loop for the server, which runs in its own task. @@ -311,16 +401,19 @@ enum Event { struct EventLoop { server_memory_limit: MemoryLimit, - /// New messages are broadcast to all clients. - broadcast_tx: broadcast::Sender, + /// New log messages are broadcast to all clients. + broadcast_log_tx: broadcast::Sender, + + /// New table messages are broadcast to all clients. + broadcast_table_tx: broadcast::Sender, /// Channel for incoming events. event_rx: mpsc::Receiver, /// Messages stored in order of arrival, and garbage collected if the server hits the memory limit. - ordered_message_queue: VecDeque, + ordered_message_queue: VecDeque, - /// Total size of `temporal_message_queue` in bytes. + /// Total size of `ordered_message_queue` in bytes. ordered_message_bytes: u64, /// Messages potentially out of order with the rest of the message stream. These are never garbage collected. @@ -331,11 +424,13 @@ impl EventLoop { fn new( server_memory_limit: MemoryLimit, event_rx: mpsc::Receiver, - broadcast_tx: broadcast::Sender, + broadcast_log_tx: broadcast::Sender, + broadcast_table_tx: broadcast::Sender, ) -> Self { Self { server_memory_limit, - broadcast_tx, + broadcast_log_tx, + broadcast_table_tx, event_rx, ordered_message_queue: Default::default(), ordered_message_bytes: 0, @@ -352,13 +447,18 @@ impl EventLoop { match event { Event::NewClient(channel) => self.handle_new_client(channel), Event::Message(msg) => self.handle_msg(msg), + Event::Table(table) => self.handle_table(table), } } } fn handle_new_client( &self, - channel: oneshot::Sender<(Vec, broadcast::Receiver)>, + channel: oneshot::Sender<( + Vec, + broadcast::Receiver, + broadcast::Receiver, + )>, ) { channel .send(( @@ -366,15 +466,17 @@ impl EventLoop { self.persistent_message_queue .iter() .cloned() + .map(Msg::from) .chain(self.ordered_message_queue.iter().cloned()) .collect(), - self.broadcast_tx.subscribe(), + self.broadcast_log_tx.subscribe(), + self.broadcast_table_tx.subscribe(), )) .ok(); } fn handle_msg(&mut self, msg: LogMsgProto) { - self.broadcast_tx.send(msg.clone()).ok(); + self.broadcast_log_tx.send(msg.clone()).ok(); self.gc_if_using_too_much_ram(); @@ -409,11 +511,21 @@ impl EventLoop { Msg::ArrowMsg(..) => { let approx_size_bytes = message_size(&msg); self.ordered_message_bytes += approx_size_bytes; - self.ordered_message_queue.push_back(msg); + self.ordered_message_queue.push_back(msg.into()); } } } + fn handle_table(&mut self, table: TableMsgProto) { + self.broadcast_table_tx.send(table.clone()).ok(); + + self.gc_if_using_too_much_ram(); + + let approx_size_bytes = table_size(&table); + self.ordered_message_bytes += approx_size_bytes; + self.ordered_message_queue.push_back(Msg::Table(table)); + } + fn gc_if_using_too_much_ram(&mut self) { re_tracing::profile_function!(); @@ -443,7 +555,7 @@ impl EventLoop { while bytes_dropped < bytes_to_free { // only drop messages from temporal queue if let Some(msg) = self.ordered_message_queue.pop_front() { - bytes_dropped += message_size(&msg); + bytes_dropped += msg.total_size_bytes(); messages_dropped += 1; } else { break; @@ -462,6 +574,11 @@ fn message_size(msg: &LogMsgProto) -> u64 { msg.total_size_bytes() } +fn table_size(table: &TableMsgProto) -> u64 { + let TableMsgProto { id, data } = table; + id.total_size_bytes() + data.total_size_bytes() +} + pub struct MessageProxy { _queue_task_handle: tokio::task::JoinHandle<()>, event_tx: mpsc::Sender, @@ -472,16 +589,28 @@ impl MessageProxy { Self::new_with_recv(server_memory_limit).0 } - fn new_with_recv(server_memory_limit: MemoryLimit) -> (Self, broadcast::Receiver) { + fn new_with_recv( + server_memory_limit: MemoryLimit, + ) -> ( + Self, + broadcast::Receiver, + broadcast::Receiver, + ) { // Channel capacity is completely arbitrary. // We just want something large enough to handle bursts of messages. let (event_tx, event_rx) = mpsc::channel(1024); - let (broadcast_tx, broadcast_rx) = broadcast::channel(1024); + let (broadcast_log_tx, broadcast_log_rx) = broadcast::channel(1024); + let (broadcast_table_tx, broadcast_table_rx) = broadcast::channel(1024); let task_handle = tokio::spawn(async move { - EventLoop::new(server_memory_limit, event_rx, broadcast_tx) - .run_in_place() - .await; + EventLoop::new( + server_memory_limit, + event_rx, + broadcast_log_tx, + broadcast_table_tx, + ) + .run_in_place() + .await; }); ( @@ -489,21 +618,26 @@ impl MessageProxy { _queue_task_handle: task_handle, event_tx, }, - broadcast_rx, + broadcast_log_rx, + broadcast_table_rx, ) } - async fn push(&self, msg: LogMsgProto) { + async fn push_msg(&self, msg: LogMsgProto) { self.event_tx.send(Event::Message(msg)).await.ok(); } - async fn new_client_stream(&self) -> ReadMessagesStream { + async fn push_table(&self, table: TableMsgProto) { + self.event_tx.send(Event::Table(table)).await.ok(); + } + + async fn new_client_message_stream(&self) -> ReadMessagesStream { let (sender, receiver) = oneshot::channel(); if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await { re_log::error!("Error initializing new client: {err}"); return Box::pin(tokio_stream::empty()); }; - let (history, channel) = match receiver.await { + let (history, log_channel, _) = match receiver.await { Ok(v) => v, Err(err) => { re_log::error!("Error initializing new client: {err}"); @@ -514,12 +648,18 @@ impl MessageProxy { let history = tokio_stream::iter( history .into_iter() - .map(|log_msg| ReadMessagesResponse { - log_msg: Some(log_msg), + .filter_map(|log_msg| { + if let Msg::LogMsg(log_msg) = log_msg { + Some(ReadMessagesResponse { + log_msg: Some(log_msg), + }) + } else { + None + } }) .map(Ok), ); - let channel = BroadcastStream::new(channel).map(|result| { + let channel = BroadcastStream::new(log_channel).map(|result| { result .map(|log_msg| ReadMessagesResponse { log_msg: Some(log_msg), @@ -532,9 +672,54 @@ impl MessageProxy { Box::pin(history.chain(channel)) } + + async fn new_client_table_stream(&self) -> ReadTablesStream { + let (sender, receiver) = oneshot::channel(); + if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await { + re_log::error!("Error initializing new client: {err}"); + return Box::pin(tokio_stream::empty()); + }; + let (history, _, table_channel) = match receiver.await { + Ok(v) => v, + Err(err) => { + re_log::error!("Error initializing new client: {err}"); + return Box::pin(tokio_stream::empty()); + } + }; + + let history = tokio_stream::iter( + history + .into_iter() + .filter_map(|table| { + if let Msg::Table(table) = table { + Some(ReadTablesResponse { + id: Some(table.id), + data: Some(table.data), + }) + } else { + None + } + }) + .map(Ok), + ); + let channel = BroadcastStream::new(table_channel).map(|result| { + result + .map(|table| ReadTablesResponse { + id: Some(table.id), + data: Some(table.data), + }) + .map_err(|err| { + re_log::error!("Error reading message from broadcast channel: {err}"); + tonic::Status::internal("internal channel error") + }) + }); + + Box::pin(history.chain(channel)) + } } type ReadMessagesStream = Pin> + Send>>; +type ReadTablesStream = Pin> + Send>>; #[tonic::async_trait] impl message_proxy_service_server::MessageProxyService for MessageProxy { @@ -548,7 +733,7 @@ impl message_proxy_service_server::MessageProxyService for MessageProxy { Ok(Some(WriteMessagesRequest { log_msg: Some(log_msg), })) => { - self.push(log_msg).await; + self.push_msg(log_msg).await; } Ok(Some(WriteMessagesRequest { log_msg: None })) => { @@ -576,7 +761,33 @@ impl message_proxy_service_server::MessageProxyService for MessageProxy { &self, _: tonic::Request, ) -> tonic::Result> { - Ok(tonic::Response::new(self.new_client_stream().await)) + Ok(tonic::Response::new(self.new_client_message_stream().await)) + } + + type ReadTablesStream = ReadTablesStream; + + async fn write_table( + &self, + request: tonic::Request, + ) -> tonic::Result> { + if let WriteTableRequest { + id: Some(id), + data: Some(data), + } = request.into_inner() + { + self.push_table(TableMsgProto { id, data }).await; + } else { + re_log::warn!("malformed `WriteTableRequest`"); + } + + Ok(tonic::Response::new(WriteTableResponse {})) + } + + async fn read_tables( + &self, + _: tonic::Request, + ) -> tonic::Result> { + Ok(tonic::Response::new(self.new_client_table_stream().await)) } } diff --git a/crates/store/re_log_types/src/lib.rs b/crates/store/re_log_types/src/lib.rs index 301fd8255eea..9079efb91113 100644 --- a/crates/store/re_log_types/src/lib.rs +++ b/crates/store/re_log_types/src/lib.rs @@ -640,6 +640,61 @@ impl std::fmt::Display for StoreSource { // --- +#[must_use] +#[derive(Clone, Debug, PartialEq)] +pub struct TableMsg { + /// The id of the table. + pub id: TableId, + + /// The table stored as an [`ArrowRecordBatch`]. + pub data: ArrowRecordBatch, +} + +impl TableMsg { + /// Returns the [`TableMsg`] encoded as a record batch. + // This is required to send bytes to a viewer running in a notebook. + // If you ever change this, you also need to adapt `notebook.py` too. + pub fn to_arrow_encoded(&self) -> Result { + let current_schema = self.data.schema(); + let mut metadata = current_schema.metadata().clone(); + metadata.insert("__table_id".to_owned(), self.id.as_str().to_owned()); + + // Create a new schema with the updated metadata + let new_schema = Arc::new(arrow::datatypes::Schema::new_with_metadata( + current_schema.fields().clone(), + metadata, + )); + + // Create a new record batch with the same data but updated schema + ArrowRecordBatch::try_new(new_schema, self.data.columns().to_vec()) + } + + /// Returns the [`TableMsg`] back from a encoded record batch. + // This is required to send bytes around in the notebook. + // If you ever change this, you also need to adapt `notebook.py` too. + pub fn from_arrow_encoded(data: &ArrowRecordBatch) -> Option { + re_log::info!("{:?}", data); + let mut metadata = data.schema().metadata().clone(); + let id = metadata.remove("__table_id").expect("this has to be here"); + + let data = ArrowRecordBatch::try_new( + Arc::new(arrow::datatypes::Schema::new_with_metadata( + data.schema().fields().clone(), + metadata, + )), + data.columns().to_vec(), + ) + .ok()?; + + Some(Self { + id: TableId::new(id), + data, + }) + } +} + +// --- + /// Build a ([`Timeline`], [`TimeInt`]) tuple from `log_time` suitable for inserting in a [`TimePoint`]. #[inline] pub fn build_log_time(log_time: Timestamp) -> (Timeline, TimeInt) { @@ -790,7 +845,7 @@ impl SizeBytes for LogMsg { /// USE ONLY FOR TESTS // TODO(#3741): remove once is released -use arrow::array::RecordBatch as ArrowRecordBatch; +use arrow::{array::RecordBatch as ArrowRecordBatch, datatypes::Field, error::ArrowError}; pub fn strip_arrow_extension_types_from_batch(batch: &mut ArrowRecordBatch) { use arrow::datatypes::{Field, Schema}; @@ -912,4 +967,53 @@ mod tests { } ); } + + #[test] + fn table_msg_concatenated_roundtrip() { + use arrow::{ + array::{ArrayRef, StringArray, UInt64Array}, + datatypes::{DataType, Field, Schema}, + }; + + let data = { + let schema = Arc::new(Schema::new_with_metadata( + vec![ + Field::new("id", DataType::UInt64, false), + Field::new("name", DataType::Utf8, false), + ], + Default::default(), + )); + + // Create a UInt64 array + let id_array = UInt64Array::from(vec![1, 2, 3, 4, 5]); + + // Create a String array + let name_array = StringArray::from(vec![ + "Alice", + "Bob", + "Charlie", + "Dave", + "http://www.rerun.io", + ]); + + // Convert arrays to ArrayRef (trait objects) + let arrays: Vec = vec![ + Arc::new(id_array) as ArrayRef, + Arc::new(name_array) as ArrayRef, + ]; + + // Create a RecordBatch + ArrowRecordBatch::try_new(schema, arrays).unwrap() + }; + + let msg = TableMsg { + id: TableId::new("test123".to_owned()), + data, + }; + + let encoded = msg.to_arrow_encoded().expect("to encoded failed"); + let decoded = TableMsg::from_arrow_encoded(&encoded).expect("from concatenated failed"); + + assert_eq!(msg, decoded); + } } diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/common.proto b/crates/store/re_protos/proto/rerun/v1alpha1/common.proto index 8bcaedda5f59..1625bd276f82 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/common.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/common.proto @@ -28,6 +28,11 @@ message RecordingId { string id = 1; } +// uniquely identifies a table +message TableId { + string id = 1; +} + // A recording can have multiple timelines, each is identified by a name, for example `log_tick`, `log_time`, etc. message Timeline { string name = 1; diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/sdk_comms.proto b/crates/store/re_protos/proto/rerun/v1alpha1/sdk_comms.proto index 14ba6d2c9c2f..54d183aad1f2 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/sdk_comms.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/sdk_comms.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package rerun.sdk_comms.v1alpha1; +import "rerun/v1alpha1/common.proto"; import "rerun/v1alpha1/log_msg.proto"; // Simple buffer for messages between SDKs and viewers. @@ -18,6 +19,9 @@ service MessageProxyService { // It may allow us to amortize the overhead of the gRPC protocol. rpc WriteMessages(stream WriteMessagesRequest) returns (WriteMessagesResponse) {} rpc ReadMessages(ReadMessagesRequest) returns (stream ReadMessagesResponse) {} + + rpc WriteTable(WriteTableRequest) returns (WriteTableResponse) {} + rpc ReadTables(ReadTablesRequest) returns (stream ReadTablesResponse) {} } // WriteMessages @@ -35,3 +39,22 @@ message ReadMessagesRequest {} message ReadMessagesResponse { rerun.log_msg.v1alpha1.LogMsg log_msg = 1; } + +// WriteTable + +message WriteTableRequest { + rerun.common.v1alpha1.TableId id = 1; + rerun.common.v1alpha1.DataframePart data = 2; +} + +message WriteTableResponse {} + +// ReadTable + +message ReadTablesRequest {} + +message ReadTablesResponse { + rerun.common.v1alpha1.TableId id = 1; + rerun.common.v1alpha1.DataframePart data = 2; +} + diff --git a/crates/store/re_protos/src/lib.rs b/crates/store/re_protos/src/lib.rs index 0f386ad28225..db14ae78b342 100644 --- a/crates/store/re_protos/src/lib.rs +++ b/crates/store/re_protos/src/lib.rs @@ -288,6 +288,15 @@ mod sizes { } } + impl SizeBytes for crate::common::v1alpha1::TableId { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { id } = self; + + id.heap_size_bytes() + } + } + impl SizeBytes for crate::log_msg::v1alpha1::StoreSource { #[inline] fn heap_size_bytes(&self) -> u64 { @@ -348,4 +357,16 @@ mod sizes { + make_default.heap_size_bytes() } } + + impl SizeBytes for crate::common::v1alpha1::DataframePart { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + encoder_version, + payload, + } = self; + + encoder_version.heap_size_bytes() + payload.heap_size_bytes() + } + } } diff --git a/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.ext.rs index 289ce976e2dd..4ab338cf440d 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.ext.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError}; -use re_log_types::external::re_types_core::ComponentDescriptor; +use re_log_types::{external::re_types_core::ComponentDescriptor, TableId}; use crate::{invalid_field, missing_field, TypeConversionError}; @@ -402,6 +402,22 @@ impl From for crate::common::v1alpha1::RecordingId { } } +impl From for crate::common::v1alpha1::TableId { + #[inline] + fn from(value: re_log_types::TableId) -> Self { + Self { + id: value.as_str().to_owned(), + } + } +} + +impl From for re_log_types::TableId { + #[inline] + fn from(value: crate::common::v1alpha1::TableId) -> Self { + TableId::from(value.id) + } +} + impl From for crate::log_msg::v1alpha1::StoreSource { #[inline] fn from(value: re_log_types::StoreSource) -> Self { diff --git a/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.rs b/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.rs index cb0cd8a4df9c..c4649892ef40 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.rs @@ -38,6 +38,22 @@ impl ::prost::Name for RecordingId { "/rerun.common.v1alpha1.RecordingId".into() } } +/// uniquely identifies a table +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TableId { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, +} +impl ::prost::Name for TableId { + const NAME: &'static str = "TableId"; + const PACKAGE: &'static str = "rerun.common.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.common.v1alpha1.TableId".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.common.v1alpha1.TableId".into() + } +} /// A recording can have multiple timelines, each is identified by a name, for example `log_tick`, `log_time`, etc. #[derive(Clone, PartialEq, ::prost::Message)] pub struct Timeline { diff --git a/crates/store/re_protos/src/v1alpha1/rerun.sdk_comms.v1alpha1.rs b/crates/store/re_protos/src/v1alpha1/rerun.sdk_comms.v1alpha1.rs index edd52a79936d..e8b8bf1a319e 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.sdk_comms.v1alpha1.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.sdk_comms.v1alpha1.rs @@ -53,6 +53,64 @@ impl ::prost::Name for ReadMessagesResponse { "/rerun.sdk_comms.v1alpha1.ReadMessagesResponse".into() } } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct WriteTableRequest { + #[prost(message, optional, tag = "1")] + pub id: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub data: ::core::option::Option, +} +impl ::prost::Name for WriteTableRequest { + const NAME: &'static str = "WriteTableRequest"; + const PACKAGE: &'static str = "rerun.sdk_comms.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.sdk_comms.v1alpha1.WriteTableRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.sdk_comms.v1alpha1.WriteTableRequest".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct WriteTableResponse {} +impl ::prost::Name for WriteTableResponse { + const NAME: &'static str = "WriteTableResponse"; + const PACKAGE: &'static str = "rerun.sdk_comms.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.sdk_comms.v1alpha1.WriteTableResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.sdk_comms.v1alpha1.WriteTableResponse".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReadTablesRequest {} +impl ::prost::Name for ReadTablesRequest { + const NAME: &'static str = "ReadTablesRequest"; + const PACKAGE: &'static str = "rerun.sdk_comms.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.sdk_comms.v1alpha1.ReadTablesRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.sdk_comms.v1alpha1.ReadTablesRequest".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadTablesResponse { + #[prost(message, optional, tag = "1")] + pub id: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub data: ::core::option::Option, +} +impl ::prost::Name for ReadTablesResponse { + const NAME: &'static str = "ReadTablesResponse"; + const PACKAGE: &'static str = "rerun.sdk_comms.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.sdk_comms.v1alpha1.ReadTablesResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.sdk_comms.v1alpha1.ReadTablesResponse".into() + } +} /// Generated client implementations. pub mod message_proxy_service_client { #![allow( @@ -183,6 +241,46 @@ pub mod message_proxy_service_client { )); self.inner.server_streaming(req, path, codec).await } + pub async fn write_table( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.sdk_comms.v1alpha1.MessageProxyService/WriteTable", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.sdk_comms.v1alpha1.MessageProxyService", + "WriteTable", + )); + self.inner.unary(req, path, codec).await + } + pub async fn read_tables( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.sdk_comms.v1alpha1.MessageProxyService/ReadTables", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.sdk_comms.v1alpha1.MessageProxyService", + "ReadTables", + )); + self.inner.server_streaming(req, path, codec).await + } } } /// Generated server implementations. @@ -213,6 +311,19 @@ pub mod message_proxy_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn write_table( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the ReadTables method. + type ReadTablesStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + std::marker::Send + + 'static; + async fn read_tables( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } /// Simple buffer for messages between SDKs and viewers. /// @@ -382,6 +493,91 @@ pub mod message_proxy_service_server { }; Box::pin(fut) } + "/rerun.sdk_comms.v1alpha1.MessageProxyService/WriteTable" => { + #[allow(non_camel_case_types)] + struct WriteTableSvc(pub Arc); + impl + tonic::server::UnaryService for WriteTableSvc + { + type Response = super::WriteTableResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::write_table(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = WriteTableSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/rerun.sdk_comms.v1alpha1.MessageProxyService/ReadTables" => { + #[allow(non_camel_case_types)] + struct ReadTablesSvc(pub Arc); + impl + tonic::server::ServerStreamingService + for ReadTablesSvc + { + type Response = super::ReadTablesResponse; + type ResponseStream = T::ReadTablesStream; + type Future = + BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::read_tables(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReadTablesSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => Box::pin(async move { let mut response = http::Response::new(empty_body()); let headers = response.headers_mut(); diff --git a/crates/top/rerun/Cargo.toml b/crates/top/rerun/Cargo.toml index 91f87eb9cdf9..20cbe3dbbc57 100644 --- a/crates/top/rerun/Cargo.toml +++ b/crates/top/rerun/Cargo.toml @@ -139,6 +139,7 @@ re_video.workspace = true anyhow.workspace = true arrow.workspace = true +crossbeam.workspace = true document-features.workspace = true itertools.workspace = true similar-asserts.workspace = true diff --git a/crates/top/rerun/src/commands/entrypoint.rs b/crates/top/rerun/src/commands/entrypoint.rs index 60e3160e302e..992f9d4e3da8 100644 --- a/crates/top/rerun/src/commands/entrypoint.rs +++ b/crates/top/rerun/src/commands/entrypoint.rs @@ -4,8 +4,9 @@ use clap::{CommandFactory as _, Subcommand}; use itertools::Itertools as _; use tokio::runtime::Runtime; +use crossbeam::channel::Receiver as CrossbeamReceiver; use re_data_source::DataSource; -use re_log_types::LogMsg; +use re_log_types::{LogMsg, TableMsg}; use re_sdk::sink::LogSink as _; use re_smart_channel::{ReceiveSet, Receiver, SmartMessagePayload}; @@ -717,7 +718,7 @@ fn run_impl( // Where do we get the data from? let mut catalog_endpoints: Vec<_> = Vec::new(); - let rxs: Vec> = { + let (rxs_log, rxs_table): (Vec>, Vec>) = { let data_sources = args .url_or_paths .iter() @@ -764,7 +765,8 @@ fn run_impl( } }); - let mut rxs = data_sources + let mut rxs_table = Vec::new(); + let mut rxs_logs = data_sources .into_iter() .filter_map( |data_source| match data_source.stream(on_cmd.clone(), None) { @@ -785,7 +787,7 @@ fn run_impl( anyhow::bail!("expected `/proxy` endpoint"); }; let rx = re_sdk::external::re_grpc_client::message_proxy::stream(endpoint, None); - rxs.push(rx); + rxs_logs.push(rx); } else { // Check if there is already a viewer running and if so, send the data to it. use std::net::TcpStream; @@ -801,16 +803,20 @@ fn run_impl( // For that we spawn the server a bit further down, after we've collected // all receivers into `rxs`. } else if !args.serve && !args.serve_web && !args.serve_grpc { - let server: Receiver = re_grpc_server::spawn_with_recv( + let (log_server, table_server): ( + Receiver, + crossbeam::channel::Receiver, + ) = re_grpc_server::spawn_with_recv( server_addr, server_memory_limit, re_grpc_server::shutdown::never(), ); - rxs.push(server); + rxs_logs.push(log_server); + rxs_table.push(table_server); } } - rxs + (rxs_logs, rxs_table) }; // Now what do we do with the data? @@ -820,14 +826,14 @@ fn run_impl( anyhow::bail!("`--test-receive` does not support catalogs"); } - let rx = ReceiveSet::new(rxs); + let rx = ReceiveSet::new(rxs_log); assert_receive_into_entity_db(&rx).map(|_db| ()) } else if let Some(rrd_path) = args.save { if !catalog_endpoints.is_empty() { anyhow::bail!("`--save` does not support catalogs"); } - let rx = ReceiveSet::new(rxs); + let rx = ReceiveSet::new(rxs_log); Ok(stream_to_rrd_on_disk(&rx, &rrd_path.into())?) } else if args.serve_grpc { if !catalog_endpoints.is_empty() { @@ -835,7 +841,7 @@ fn run_impl( } if !cfg!(feature = "server") { - _ = (call_source, rxs); + _ = (call_source, rxs_log); anyhow::bail!("Can't host server - rerun was not compiled with the 'server' feature"); } @@ -848,7 +854,7 @@ fn run_impl( server_addr, server_memory_limit, shutdown, - ReceiveSet::new(rxs), + ReceiveSet::new(rxs_log), ); // Gracefully shut down the server on SIGINT @@ -864,7 +870,7 @@ fn run_impl( } if !cfg!(feature = "server") { - _ = (call_source, rxs); + _ = (call_source, rxs_log); anyhow::bail!("Can't host server - rerun was not compiled with the 'server' feature"); } @@ -891,7 +897,7 @@ fn run_impl( server_addr, server_memory_limit, re_grpc_server::shutdown::never(), - ReceiveSet::new(rxs), + ReceiveSet::new(rxs_log), ); // We always host the web-viewer in case the users wants it, @@ -932,7 +938,7 @@ fn run_impl( // no need to `rt.enter()`: let sink = re_sdk::sink::GrpcSink::new(endpoint, crate::default_flush_timeout()); - for rx in rxs { + for rx in rxs_log { while rx.is_connected() { while let Ok(msg) = rx.recv() { if let Some(log_msg) = msg.into_data() { @@ -977,8 +983,13 @@ fn run_impl( re_viewer::AsyncRuntimeHandle::new_native(tokio_runtime_handle), (command_sender, command_receiver), ); - for rx in rxs { - app.add_receiver(rx); + for rx in rxs_log { + app.add_log_receiver(rx); + } + for rx in rxs_table { + // TODO: remove! + re_log::info!("Added table receiver"); + app.add_table_receiver(rx); } app.set_profiler(profiler); if let Ok(url) = std::env::var("EXAMPLES_MANIFEST_URL") { @@ -995,7 +1006,7 @@ fn run_impl( } #[cfg(not(feature = "native_viewer"))] { - _ = (call_source, rxs); + _ = (call_source, rxs_log); anyhow::bail!( "Can't start viewer - rerun was compiled without the 'native_viewer' feature" ); diff --git a/crates/viewer/re_viewer/Cargo.toml b/crates/viewer/re_viewer/Cargo.toml index d8eb17b30d6b..3f6e1e10e0be 100644 --- a/crates/viewer/re_viewer/Cargo.toml +++ b/crates/viewer/re_viewer/Cargo.toml @@ -103,8 +103,10 @@ egui_table.workspace = true # External ahash.workspace = true anyhow.workspace = true +arrow.workspace = true bytemuck.workspace = true cfg-if.workspace = true +crossbeam.workspace = true eframe = { workspace = true, default-features = false, features = [ "default_fonts", "persistence", diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index 1b196fdce0e9..599d79466d99 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -7,7 +7,7 @@ use re_capabilities::MainThreadToken; use re_chunk::TimelineName; use re_data_source::{DataSource, FileContents}; use re_entity_db::entity_db::EntityDb; -use re_log_types::{ApplicationId, FileSource, LogMsg, StoreKind}; +use re_log_types::{ApplicationId, FileSource, LogMsg, StoreKind, TableMsg}; use re_renderer::WgpuResourcePoolStatistics; use re_smart_channel::{ReceiveSet, SmartChannelSource}; use re_ui::{notifications, DesignTokens, UICommand, UICommandSender as _}; @@ -16,7 +16,7 @@ use re_viewer_context::{ store_hub::{BlueprintPersistence, StoreHub, StoreHubStats}, AppOptions, AsyncRuntimeHandle, BlueprintUndoState, CommandReceiver, CommandSender, ComponentUiRegistry, DisplayMode, PlayState, StorageContext, StoreContext, SystemCommand, - SystemCommandSender as _, ViewClass, ViewClassRegistry, ViewClassRegistryError, + SystemCommandSender as _, TableStore, ViewClass, ViewClassRegistry, ViewClassRegistryError, }; use crate::{ @@ -171,6 +171,8 @@ struct PendingFilePromise { promise: poll_promise::Promise>, } +type ReceiveSetTable = parking_lot::Mutex>>; + /// The Rerun Viewer as an [`eframe`] application. pub struct App { #[allow(dead_code)] // Unused on wasm32 @@ -193,7 +195,8 @@ pub struct App { component_ui_registry: ComponentUiRegistry, - rx: ReceiveSet, + rx_log: ReceiveSet, + rx_table: ReceiveSetTable, #[cfg(target_arch = "wasm32")] open_files_promise: Option, @@ -395,7 +398,8 @@ impl App { text_log_rx, component_ui_registry, - rx: Default::default(), + rx_log: Default::default(), + rx_table: Default::default(), #[cfg(target_arch = "wasm32")] open_files_promise: Default::default(), state, @@ -460,29 +464,25 @@ impl App { } #[allow(clippy::needless_pass_by_ref_mut)] - pub fn add_receiver(&mut self, rx: re_smart_channel::Receiver) { + pub fn add_log_receiver(&mut self, rx: re_smart_channel::Receiver) { // Make sure we wake up when a message is sent. #[cfg(not(target_arch = "wasm32"))] let rx = crate::wake_up_ui_thread_on_each_msg(rx, self.egui_ctx.clone()); - // Add unknown redap servers. - // - // Otherwise we end up in a situation where we have a data from an unknown server, - // which is unnecessary and can get us into a strange ui state. - if let SmartChannelSource::RedapGrpcStream(endpoint) = rx.source() { - if !self.state.redap_servers.has_server(&endpoint.origin) { - self.command_sender - .send_system(SystemCommand::AddRedapServer { - endpoint: re_uri::CatalogEndpoint::new(endpoint.origin.clone()), - }); - } - } + self.rx_log.add(rx); + } - self.rx.add(rx); + #[allow(clippy::needless_pass_by_ref_mut)] + pub fn add_table_receiver(&mut self, rx: crossbeam::channel::Receiver) { + // Make sure we wake up when a message is sent. + #[cfg(not(target_arch = "wasm32"))] + let rx = crate::wake_up_ui_thread_on_each_msg_crossbeam(rx, self.egui_ctx.clone()); + + self.rx_table.lock().push(rx); } pub fn msg_receive_set(&self) -> &ReceiveSet { - &self.rx + &self.rx_log } /// Adds a new view class to the viewer. @@ -552,7 +552,7 @@ impl App { // button in the browser, and there is still a connection downloading an .rrd. // That's the case of `SmartChannelSource::RrdHttpStream`. // TODO(emilk): exactly what things get kept and what gets cleared? - self.rx.retain(|r| match r.source() { + self.rx_log.retain(|r| match r.source() { SmartChannelSource::File(_) | SmartChannelSource::RrdHttpStream { .. } => false, SmartChannelSource::JsChannel { .. } @@ -565,13 +565,13 @@ impl App { } SystemCommand::ClearSourceAndItsStores(source) => { - self.rx.retain(|r| r.source() != &source); + self.rx_log.retain(|r| r.source() != &source); store_hub.retain_recordings(|db| db.data_source.as_ref() != Some(&source)); } SystemCommand::AddReceiver(rx) => { re_log::debug!("Received AddReceiver"); - self.add_receiver(rx); + self.add_log_receiver(rx); } SystemCommand::ChangeDisplayMode(display_mode) => { @@ -612,7 +612,7 @@ impl App { }); match data_source.stream(on_cmd, Some(waker)) { - Ok(re_data_source::StreamSource::LogMessages(rx)) => self.add_receiver(rx), + Ok(re_data_source::StreamSource::LogMessages(rx)) => self.add_log_receiver(rx), Ok(re_data_source::StreamSource::CatalogData { endpoint }) => { self.command_sender .send_system(SystemCommand::AddRedapServer { endpoint }); @@ -1346,7 +1346,7 @@ impl App { &self.reflection, &self.component_ui_registry, &self.view_class_registry, - &self.rx, + &self.rx_log, &self.command_sender, &WelcomeScreenState { hide: self.startup_options.hide_welcome_screen, @@ -1375,9 +1375,50 @@ impl App { fn receive_messages(&self, store_hub: &mut StoreHub, egui_ctx: &egui::Context) { re_tracing::profile_function!(); + // TODO: Should we bring back analytics for this too? + self.rx_table.lock().retain(|rx| match rx.try_recv() { + Ok(table) => { + + match re_sorbet::SorbetBatch::try_from_record_batch(&table.data, re_sorbet::BatchType::Dataframe) { + Ok(sorbet_batch) => { + // TODO(grtlr): For now we don't append anything to existing stores and always replace. + let store = TableStore::default(); + store.add_batch(sorbet_batch); + + if store_hub.insert_table_store(table.id.clone(), store).is_some() { + re_log::debug!("Overwritten table store with id: `{}`", table.id); + } else { + re_log::debug!("Inserted table store with id: `{}`", table.id); + }; + store_hub.set_active_entry(table.id.clone().into()); + + + // Also select the new recording: + self.command_sender.send_system(SystemCommand::SetSelection( + re_viewer_context::Item::TableId(table.id.clone()), + )); + + // If the viewer is in the background, tell the user that it has received something new. + egui_ctx.send_viewport_cmd( + egui::ViewportCommand::RequestUserAttention( + egui::UserAttentionType::Informational, + ), + ); + }, + Err(err) => { + re_log::warn!("the received dataframe does not contain Sorbet-complaiant batches: {err}"); + } + } + + true + } + Err(crossbeam::channel::TryRecvError::Empty) => true, + Err(_) => false, + }); + let start = web_time::Instant::now(); - while let Some((channel_source, msg)) = self.rx.try_recv() { + while let Some((channel_source, msg)) = self.rx_log.try_recv() { re_log::trace!("Received a message from {channel_source:?}"); // Used by `test_ui_wakeup` test app! let msg = match msg.payload { @@ -1720,7 +1761,7 @@ impl App { // flickering quickly before receiving some data. // So: if we expect data very soon, we do a fade-in. - for source in self.rx.sources() { + for source in self.rx_log.sources() { #[allow(clippy::match_same_arms)] match &*source { SmartChannelSource::File(_) diff --git a/crates/viewer/re_viewer/src/lib.rs b/crates/viewer/re_viewer/src/lib.rs index f0e6ebe8c2ca..27927729d74b 100644 --- a/crates/viewer/re_viewer/src/lib.rs +++ b/crates/viewer/re_viewer/src/lib.rs @@ -253,6 +253,31 @@ pub fn wake_up_ui_thread_on_each_msg( new_rx } +/// This wakes up the ui thread each time we receive a new message. +#[cfg(not(target_arch = "wasm32"))] +pub fn wake_up_ui_thread_on_each_msg_crossbeam( + rx: crossbeam::channel::Receiver, + ctx: egui::Context, +) -> crossbeam::channel::Receiver { + // We need to intercept messages to wake up the ui thread. + // For that, we need a new channel. + let (tx, new_rx) = crossbeam::channel::unbounded(); + std::thread::Builder::new() + .name("ui_waker".to_owned()) + .spawn(move || { + while let Ok(msg) = rx.recv() { + if tx.send(msg).is_ok() { + ctx.request_repaint(); + } else { + break; + } + } + re_log::trace!("Shutting down ui_waker thread"); + }) + .expect("Failed to spawn UI waker thread"); + new_rx +} + /// Reset the viewer state as stored on disk and local storage, /// keeping only the analytics state. #[allow(clippy::unnecessary_wraps)] // wasm only diff --git a/crates/viewer/re_viewer/src/native.rs b/crates/viewer/re_viewer/src/native.rs index fc816b459fd8..224b30e9bc98 100644 --- a/crates/viewer/re_viewer/src/native.rs +++ b/crates/viewer/re_viewer/src/native.rs @@ -110,7 +110,7 @@ pub fn run_native_viewer_with_messages( cc, async_runtime, ); - app.add_receiver(rx); + app.add_log_receiver(rx); Box::new(app) }), force_wgpu_backend.as_deref(), diff --git a/crates/viewer/re_viewer/src/web.rs b/crates/viewer/re_viewer/src/web.rs index 1df30c76aa6d..ed3a7814b5be 100644 --- a/crates/viewer/re_viewer/src/web.rs +++ b/crates/viewer/re_viewer/src/web.rs @@ -22,6 +22,11 @@ use crate::web_tools::{ static GLOBAL: AccountingAllocator = AccountingAllocator::new(std::alloc::System); +struct Channel { + log_tx: re_smart_channel::Sender, + table_tx: crossbeam::channel::Sender, +} + #[wasm_bindgen] pub struct WebHandle { runner: eframe::WebRunner, @@ -30,7 +35,7 @@ pub struct WebHandle { /// /// This exists because the direct bytes API is expected to submit many small RRD chunks /// and allocating a new tx pair for each chunk doesn't make sense. - tx_channels: HashMap>, + tx_channels: HashMap, app_options: AppOptions, } @@ -187,7 +192,7 @@ impl WebHandle { url.to_owned(), app.command_sender.clone(), ) { - app.add_receiver(rx); + app.add_log_receiver(rx); } } @@ -216,15 +221,18 @@ impl WebHandle { return; } - let (tx, rx) = re_smart_channel::smart_channel( + let (log_tx, log_rx) = re_smart_channel::smart_channel( re_smart_channel::SmartMessageSource::JsChannelPush, re_smart_channel::SmartChannelSource::JsChannel { channel_name: channel_name.to_owned(), }, ); + let (table_tx, table_rx) = crossbeam::channel::unbounded(); - app.add_receiver(rx); - self.tx_channels.insert(id.to_owned(), tx); + app.add_log_receiver(log_rx); + app.add_table_receiver(table_rx); + self.tx_channels + .insert(id.to_owned(), Channel { log_tx, table_tx }); } /// Close an existing channel for streaming data. @@ -236,8 +244,12 @@ impl WebHandle { return; }; - if let Some(tx) = self.tx_channels.remove(id) { - tx.quit(None).warn_on_err_once("Failed to send quit marker"); + if let Some(channel) = self.tx_channels.remove(id) { + channel + .log_tx + .quit(None) + .warn_on_err_once("Failed to send quit marker"); + drop(channel.table_tx); } // Request a repaint since closing the channel may update the top bar. @@ -253,7 +265,8 @@ impl WebHandle { return; }; - if let Some(tx) = self.tx_channels.get(id).cloned() { + if let Some(channel) = self.tx_channels.get(id) { + let tx = channel.log_tx.clone(); let data: Vec = data.to_vec(); let egui_ctx = app.egui_ctx.clone(); @@ -293,6 +306,37 @@ impl WebHandle { } } + #[wasm_bindgen] + pub fn send_table_to_channel(&self, id: &str, data: &[u8]) { + let Some(app) = self.runner.app_mut::() else { + return; + }; + + if let Some(channel) = self.tx_channels.get(id) { + let tx = channel.table_tx.clone(); + + // TODO: error handling + let cursor = std::io::Cursor::new(data); + let stream_reader = arrow::ipc::reader::StreamReader::try_new(cursor, None) + .expect("failed to create cursor"); + + let encoded = &stream_reader + .collect::, _>>() + .expect("could not read from IPC stream")[0]; + let msg = + re_log_types::TableMsg::from_arrow_encoded(encoded).expect("msg decode failed"); + + let egui_ctx = app.egui_ctx.clone(); + + if tx.send(msg).is_ok() { + // TODO(jan): Is this enough to request a repaint? + egui_ctx.request_repaint_after(std::time::Duration::from_millis(10)); + } else { + re_log::info_once!("Failed to dispatch log message to viewer."); + } + } + } + #[wasm_bindgen] pub fn get_active_recording_id(&self) -> Option { let app = self.runner.app_mut::()?; diff --git a/crates/viewer/re_viewer_context/src/store_hub.rs b/crates/viewer/re_viewer_context/src/store_hub.rs index e50b23149996..179b97d23d00 100644 --- a/crates/viewer/re_viewer_context/src/store_hub.rs +++ b/crates/viewer/re_viewer_context/src/store_hub.rs @@ -331,6 +331,11 @@ impl StoreHub { self.store_bundle.insert(entity_db); } + /// Inserts a new table into the store (potentially overwriting an existing entry). + pub fn insert_table_store(&mut self, id: TableId, store: TableStore) -> Option { + self.table_stores.insert(id, store) + } + fn remove_store(&mut self, store_id: &StoreId) { _ = self.caches_per_recording.remove(store_id); let removed_store = self.store_bundle.remove(store_id); @@ -391,6 +396,7 @@ impl StoreHub { } /// Remove all open recordings and applications, and go to the welcome page. + // TODO: Make sure we also clear all tables pub fn clear_recordings(&mut self) { // Keep only the welcome screen: let mut store_ids_retained = HashSet::default(); diff --git a/examples/rust/custom_callback/src/viewer.rs b/examples/rust/custom_callback/src/viewer.rs index 7627da615802..b6cd8d095dfb 100644 --- a/examples/rust/custom_callback/src/viewer.rs +++ b/examples/rust/custom_callback/src/viewer.rs @@ -25,7 +25,7 @@ async fn main() -> Result<(), Box> { // Listen for gRPC connections from Rerun's logging SDKs. // There are other ways of "feeding" the viewer though - all you need is a `re_smart_channel::Receiver`. - let rx = re_grpc_server::spawn_with_recv( + let (rx_log, rx_table) = re_grpc_server::spawn_with_recv( "0.0.0.0:9877".parse()?, "75%".parse()?, re_grpc_server::shutdown::never(), @@ -66,7 +66,8 @@ async fn main() -> Result<(), Box> { AsyncRuntimeHandle::from_current_tokio_runtime_or_wasmbindgen()?, ); - rerun_app.add_receiver(rx); + rerun_app.add_log_receiver(rx_log); + rerun_app.add_table_receiver(rx_table); Ok(Box::new(Control::new(rerun_app, handle))) }), diff --git a/examples/rust/custom_view/src/main.rs b/examples/rust/custom_view/src/main.rs index 930002206127..9886fe7659d7 100644 --- a/examples/rust/custom_view/src/main.rs +++ b/examples/rust/custom_view/src/main.rs @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box> { // Listen for gRPC connections from Rerun's logging SDKs. // There are other ways of "feeding" the viewer though - all you need is a `re_smart_channel::Receiver`. - let rx = re_grpc_server::spawn_with_recv( + let (rx, _) = re_grpc_server::spawn_with_recv( "0.0.0.0:9876".parse()?, "75%".parse()?, re_grpc_server::shutdown::never(), @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box> { "Could not get a runtime handle from the current Tokio runtime or Wasm bindgen.", ), ); - app.add_receiver(rx); + app.add_log_receiver(rx); // Register the custom view app.add_view_class::() diff --git a/examples/rust/extend_viewer_ui/src/main.rs b/examples/rust/extend_viewer_ui/src/main.rs index 66667664024d..61daa10cab47 100644 --- a/examples/rust/extend_viewer_ui/src/main.rs +++ b/examples/rust/extend_viewer_ui/src/main.rs @@ -25,7 +25,7 @@ async fn main() -> Result<(), Box> { // Listen for gRPC connections from Rerun's logging SDKs. // There are other ways of "feeding" the viewer though - all you need is a `re_smart_channel::Receiver`. - let rx = re_grpc_server::spawn_with_recv( + let (rx, _) = re_grpc_server::spawn_with_recv( "0.0.0.0:9876".parse()?, "75%".parse()?, re_grpc_server::shutdown::never(), @@ -56,7 +56,7 @@ async fn main() -> Result<(), Box> { cc, AsyncRuntimeHandle::from_current_tokio_runtime_or_wasmbindgen()?, ); - rerun_app.add_receiver(rx); + rerun_app.add_log_receiver(rx); Ok(Box::new(MyApp { rerun_app })) }), )?; diff --git a/examples/rust/viewer_callbacks/src/main.rs b/examples/rust/viewer_callbacks/src/main.rs index a04f7035bab6..2c201ff822b5 100644 --- a/examples/rust/viewer_callbacks/src/main.rs +++ b/examples/rust/viewer_callbacks/src/main.rs @@ -25,7 +25,7 @@ async fn main() -> Result<(), Box> { // Listen for gRPC connections from Rerun's logging SDKs. // There are other ways of "feeding" the viewer though - all you need is a `re_smart_channel::Receiver`. - let rx = re_grpc_server::spawn_with_recv( + let (rx, _) = re_grpc_server::spawn_with_recv( "0.0.0.0:9876".parse()?, "75%".parse()?, re_grpc_server::shutdown::never(), @@ -85,7 +85,7 @@ async fn main() -> Result<(), Box> { cc, AsyncRuntimeHandle::from_current_tokio_runtime_or_wasmbindgen()?, ); - rerun_app.add_receiver(rx); + rerun_app.add_log_receiver(rx); Ok(Box::new(MyApp { rerun_app, shared_state, diff --git a/rerun_js/web-viewer/index.ts b/rerun_js/web-viewer/index.ts index 75b0f1f08c7b..e46549962e2a 100644 --- a/rerun_js/web-viewer/index.ts +++ b/rerun_js/web-viewer/index.ts @@ -498,6 +498,21 @@ export class WebViewer { } }; + const on_send_table = (/** @type {Uint8Array} */ data: Uint8Array) => { + if (!this.#handle) { + throw new Error( + `attempted to send data through channel \"${channel_name}\" to a stopped web viewer`, + ); + } + + try { + this.#handle.send_table_to_channel(id, data); + } catch (e) { + this.stop(); + throw e; + } + } + const on_close = () => { if (!this.#handle) { throw new Error( @@ -515,7 +530,7 @@ export class WebViewer { const get_state = () => this.#state; - return new LogChannel(on_send, on_close, get_state); + return new LogChannel(on_send, on_send_table, on_close, get_state); } /** @@ -794,6 +809,7 @@ export class WebViewer { export class LogChannel { #on_send; + #on_send_table; #on_close; #get_state; #closed = false; @@ -805,10 +821,12 @@ export class LogChannel { */ constructor( on_send: (data: Uint8Array) => void, + on_send_table: (data: Uint8Array) => void, on_close: () => void, get_state: () => "ready" | "starting" | "stopped", ) { this.#on_send = on_send; + this.#on_send_table = on_send_table; this.#on_close = on_close; this.#get_state = get_state; } @@ -829,6 +847,11 @@ export class LogChannel { this.#on_send(rrd_bytes); } + send_table(table_bytes: Uint8Array) { + if (!this.ready) return; + this.#on_send_table(table_bytes) + } + /** * Close the channel. * diff --git a/rerun_notebook/src/js/widget.ts b/rerun_notebook/src/js/widget.ts index a68f7f9617a7..08159148f49f 100644 --- a/rerun_notebook/src/js/widget.ts +++ b/rerun_notebook/src/js/widget.ts @@ -150,6 +150,10 @@ class ViewerWidget { if (!this.channel) throw new Error("on_custom_message called before channel init"); this.channel.send_rrd(new Uint8Array(buffers[0].buffer)); + } else if (msg?.type === "table") { + if (!this.channel) + throw new Error("on_custom_message called before channel init") + this.channel.send_table(new Uint8Array(buffers[0].buffer)); } else { console.log("unknown message type", msg, buffers); } diff --git a/rerun_notebook/src/rerun_notebook/__init__.py b/rerun_notebook/src/rerun_notebook/__init__.py index d4a1f65591f4..4d71ae91444e 100644 --- a/rerun_notebook/src/rerun_notebook/__init__.py +++ b/rerun_notebook/src/rerun_notebook/__init__.py @@ -161,6 +161,7 @@ class Viewer(anywidget.AnyWidget): # type: ignore[misc] _ready = False _data_queue: list[bytes] + _table_queue: list[bytes] _time_ctrl = traitlets.Tuple( traitlets.Unicode(allow_none=True), @@ -218,10 +219,15 @@ def handle_msg(widget: Any, content: Any, buffers: list[bytes]) -> None: def _on_ready(self) -> None: self._ready = True + for data in self._data_queue: self.send_rrd(data) self._data_queue.clear() + for data in self._table_queue: + self.send_table(data) + self._table_queue.clear() + def send_rrd(self, data: bytes) -> None: """Send a recording to the viewer.""" @@ -231,6 +237,13 @@ def send_rrd(self, data: bytes) -> None: self.send({"type": "rrd"}, buffers=[data]) + def send_table(self, data: bytes) -> None: + if not self._ready: + self._table_queue.append(data) + return + + self.send({"type": "table"}, buffers=[data]) + def block_until_ready(self, timeout: float = 5.0) -> None: """Block until the viewer is ready.""" diff --git a/rerun_py/rerun_sdk/rerun/notebook.py b/rerun_py/rerun_sdk/rerun/notebook.py index 275b0347ddc4..a0c5bab7b1a5 100644 --- a/rerun_py/rerun_sdk/rerun/notebook.py +++ b/rerun_py/rerun_sdk/rerun/notebook.py @@ -8,6 +8,9 @@ from typing import TYPE_CHECKING, Any, Literal import numpy as np +from pyarrow import RecordBatch +import pyarrow +import pyarrow.ipc as ipc from .error_utils import deprecated_param from .time import to_nanos, to_nanos_since_epoch @@ -38,7 +41,7 @@ from rerun import bindings -from .recording_stream import RecordingStream, get_data_recording +from .recording_stream import RecordingStream, get_data_recording, get_global_data_recording _default_width = 640 _default_height = 480 @@ -212,6 +215,27 @@ def add_recording( if blueprint is not None: recording.send_blueprint(blueprint) + def _add_table_id(self, record_batch: RecordBatch, table_id: str): + # Get current schema + schema = record_batch.schema + schema = schema.with_metadata({b"__table_id": table_id}) + + # Create new record batch with updated schema + return pyarrow.RecordBatch.from_arrays(record_batch.columns, schema=schema) + + def send_table( + self, + id: str, + table: RecordBatch, + ) -> None: + new_table = self._add_table_id(table, id) + sink = pyarrow.BufferOutputStream() + writer = ipc.new_stream(sink, new_table.schema) + writer.write_batch(new_table) + writer.close() + table_as_bytes = sink.getvalue().to_pybytes() + self._viewer.send_table(table_as_bytes) + def display(self, block_until_ready: bool = True) -> None: """ Display the viewer in the notebook cell immediately. From 91ff3c671520421e40a2d56f2f9f5180e7cf7bf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 09:58:31 +0200 Subject: [PATCH 02/19] Move notebook specific Arrow-encoding --- crates/store/re_log_types/src/lib.rs | 94 +-------------------------- crates/viewer/re_viewer/src/web.rs | 96 ++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 93 deletions(-) diff --git a/crates/store/re_log_types/src/lib.rs b/crates/store/re_log_types/src/lib.rs index 9079efb91113..068f505cda4c 100644 --- a/crates/store/re_log_types/src/lib.rs +++ b/crates/store/re_log_types/src/lib.rs @@ -650,49 +650,6 @@ pub struct TableMsg { pub data: ArrowRecordBatch, } -impl TableMsg { - /// Returns the [`TableMsg`] encoded as a record batch. - // This is required to send bytes to a viewer running in a notebook. - // If you ever change this, you also need to adapt `notebook.py` too. - pub fn to_arrow_encoded(&self) -> Result { - let current_schema = self.data.schema(); - let mut metadata = current_schema.metadata().clone(); - metadata.insert("__table_id".to_owned(), self.id.as_str().to_owned()); - - // Create a new schema with the updated metadata - let new_schema = Arc::new(arrow::datatypes::Schema::new_with_metadata( - current_schema.fields().clone(), - metadata, - )); - - // Create a new record batch with the same data but updated schema - ArrowRecordBatch::try_new(new_schema, self.data.columns().to_vec()) - } - - /// Returns the [`TableMsg`] back from a encoded record batch. - // This is required to send bytes around in the notebook. - // If you ever change this, you also need to adapt `notebook.py` too. - pub fn from_arrow_encoded(data: &ArrowRecordBatch) -> Option { - re_log::info!("{:?}", data); - let mut metadata = data.schema().metadata().clone(); - let id = metadata.remove("__table_id").expect("this has to be here"); - - let data = ArrowRecordBatch::try_new( - Arc::new(arrow::datatypes::Schema::new_with_metadata( - data.schema().fields().clone(), - metadata, - )), - data.columns().to_vec(), - ) - .ok()?; - - Some(Self { - id: TableId::new(id), - data, - }) - } -} - // --- /// Build a ([`Timeline`], [`TimeInt`]) tuple from `log_time` suitable for inserting in a [`TimePoint`]. @@ -845,7 +802,7 @@ impl SizeBytes for LogMsg { /// USE ONLY FOR TESTS // TODO(#3741): remove once is released -use arrow::{array::RecordBatch as ArrowRecordBatch, datatypes::Field, error::ArrowError}; +use arrow::array::RecordBatch as ArrowRecordBatch; pub fn strip_arrow_extension_types_from_batch(batch: &mut ArrowRecordBatch) { use arrow::datatypes::{Field, Schema}; @@ -967,53 +924,4 @@ mod tests { } ); } - - #[test] - fn table_msg_concatenated_roundtrip() { - use arrow::{ - array::{ArrayRef, StringArray, UInt64Array}, - datatypes::{DataType, Field, Schema}, - }; - - let data = { - let schema = Arc::new(Schema::new_with_metadata( - vec![ - Field::new("id", DataType::UInt64, false), - Field::new("name", DataType::Utf8, false), - ], - Default::default(), - )); - - // Create a UInt64 array - let id_array = UInt64Array::from(vec![1, 2, 3, 4, 5]); - - // Create a String array - let name_array = StringArray::from(vec![ - "Alice", - "Bob", - "Charlie", - "Dave", - "http://www.rerun.io", - ]); - - // Convert arrays to ArrayRef (trait objects) - let arrays: Vec = vec![ - Arc::new(id_array) as ArrayRef, - Arc::new(name_array) as ArrayRef, - ]; - - // Create a RecordBatch - ArrowRecordBatch::try_new(schema, arrays).unwrap() - }; - - let msg = TableMsg { - id: TableId::new("test123".to_owned()), - data, - }; - - let encoded = msg.to_arrow_encoded().expect("to encoded failed"); - let decoded = TableMsg::from_arrow_encoded(&encoded).expect("from concatenated failed"); - - assert_eq!(msg, decoded); - } } diff --git a/crates/viewer/re_viewer/src/web.rs b/crates/viewer/re_viewer/src/web.rs index ed3a7814b5be..461fda6a8a62 100644 --- a/crates/viewer/re_viewer/src/web.rs +++ b/crates/viewer/re_viewer/src/web.rs @@ -3,6 +3,7 @@ #![allow(clippy::mem_forget)] // False positives from #[wasm_bindgen] macro use ahash::HashMap; +use arrow::{array::RecordBatch, error::ArrowErros}; use serde::Deserialize; use std::rc::Rc; use std::str::FromStr as _; @@ -888,3 +889,98 @@ pub fn set_email(email: String) { config.opt_in_metadata.insert("email".into(), email.into()); config.save().unwrap(); } + +/// Returns the [`TableMsg`] encoded as a record batch. +// This is required to send bytes to a viewer running in a notebook. +// If you ever change this, you also need to adapt `notebook.py` too. +pub fn to_arrow_encoded(&table: TableMsg) -> Result { + let current_schema = self.data.schema(); + let mut metadata = current_schema.metadata().clone(); + metadata.insert("__table_id".to_owned(), self.id.as_str().to_owned()); + + // Create a new schema with the updated metadata + let new_schema = Arc::new(arrow::datatypes::Schema::new_with_metadata( + current_schema.fields().clone(), + metadata, + )); + + // Create a new record batch with the same data but updated schema + ArrowRecordBatch::try_new(new_schema, self.data.columns().to_vec()) +} + +/// Returns the [`TableMsg`] back from a encoded record batch. +// This is required to send bytes around in the notebook. +// If you ever change this, you also need to adapt `notebook.py` too. +pub fn from_arrow_encoded(data: &ArrowRecordBatch) -> Option { + re_log::info!("{:?}", data); + let mut metadata = data.schema().metadata().clone(); + let id = metadata.remove("__table_id").expect("this has to be here"); + + let data = ArrowRecordBatch::try_new( + Arc::new(arrow::datatypes::Schema::new_with_metadata( + data.schema().fields().clone(), + metadata, + )), + data.columns().to_vec(), + ) + .ok()?; + + Some(Self { + id: TableId::new(id), + data, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn table_msg_encoded_roundtrip() { + use arrow::{ + array::{ArrayRef, StringArray, UInt64Array}, + datatypes::{DataType, Field, Schema}, + }; + + let data = { + let schema = Arc::new(Schema::new_with_metadata( + vec![ + Field::new("id", DataType::UInt64, false), + Field::new("name", DataType::Utf8, false), + ], + Default::default(), + )); + + // Create a UInt64 array + let id_array = UInt64Array::from(vec![1, 2, 3, 4, 5]); + + // Create a String array + let name_array = StringArray::from(vec![ + "Alice", + "Bob", + "Charlie", + "Dave", + "http://www.rerun.io", + ]); + + // Convert arrays to ArrayRef (trait objects) + let arrays: Vec = vec![ + Arc::new(id_array) as ArrayRef, + Arc::new(name_array) as ArrayRef, + ]; + + // Create a RecordBatch + ArrowRecordBatch::try_new(schema, arrays).unwrap() + }; + + let msg = TableMsg { + id: TableId::new("test123".to_owned()), + data, + }; + + let encoded = to_arrow_encoded(&msg).expect("to encoded failed"); + let decoded = from_arrow_encoded(&encoded).expect("from concatenated failed"); + + assert_eq!(msg, decoded); + } +} From 95f6c7cac6da7a2462bc5f6d537c6a239eff7c17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 10:00:01 +0200 Subject: [PATCH 03/19] Remove `info!` message --- crates/top/rerun/src/commands/entrypoint.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/top/rerun/src/commands/entrypoint.rs b/crates/top/rerun/src/commands/entrypoint.rs index 992f9d4e3da8..44b4d808a680 100644 --- a/crates/top/rerun/src/commands/entrypoint.rs +++ b/crates/top/rerun/src/commands/entrypoint.rs @@ -987,8 +987,6 @@ fn run_impl( app.add_log_receiver(rx); } for rx in rxs_table { - // TODO: remove! - re_log::info!("Added table receiver"); app.add_table_receiver(rx); } app.set_profiler(profiler); From 256e49146adbbd1ba2a0c4f1ece97aa4cabce62c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 10:14:05 +0200 Subject: [PATCH 04/19] missed file --- crates/viewer/re_viewer/src/app_state.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/crates/viewer/re_viewer/src/app_state.rs b/crates/viewer/re_viewer/src/app_state.rs index fc886b56f4ab..c277ba0e5608 100644 --- a/crates/viewer/re_viewer/src/app_state.rs +++ b/crates/viewer/re_viewer/src/app_state.rs @@ -155,7 +155,7 @@ impl AppState { reflection: &re_types_core::reflection::Reflection, component_ui_registry: &ComponentUiRegistry, view_class_registry: &ViewClassRegistry, - rx: &ReceiveSet, + rx_log: &ReceiveSet, command_sender: &CommandSender, welcome_screen_state: &WelcomeScreenState, is_history_enabled: bool, @@ -309,7 +309,7 @@ impl AppState { // We move the time at the very start of the frame, // so that we always show the latest data when we're in "follow" mode. - move_time(&ctx, recording, rx, callbacks); + move_time(&ctx, recording, rx_log, callbacks); // Update the viewport. May spawn new views and handle queued requests (like screenshots). viewport_ui.on_frame_start(&ctx); @@ -424,14 +424,20 @@ impl AppState { .show_inside(ui, |ui| { recordings_panel_ui( &ctx, - rx, + rx_log, ui, welcome_screen_state, redap_servers, ); }); } else { - recordings_panel_ui(&ctx, rx, ui, welcome_screen_state, redap_servers); + recordings_panel_ui( + &ctx, + rx_log, + ui, + welcome_screen_state, + redap_servers, + ); } ui.add_space(4.0); @@ -598,7 +604,7 @@ impl AppState { .show_inside(ui, |ui| { recordings_panel_ui( &ctx, - rx, + rx_log, ui, welcome_screen_state, redap_servers, @@ -607,7 +613,7 @@ impl AppState { } else { recordings_panel_ui( &ctx, - rx, + rx_log, ui, welcome_screen_state, redap_servers, From 4a210c13688d8aa969a2f4b3177ed4988b7462b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 10:32:30 +0200 Subject: [PATCH 05/19] lints --- .../proto/rerun/v1alpha1/sdk_comms.proto | 1 - crates/viewer/re_viewer/src/web.rs | 15 ++++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/sdk_comms.proto b/crates/store/re_protos/proto/rerun/v1alpha1/sdk_comms.proto index 54d183aad1f2..dfb93e13a5a8 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/sdk_comms.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/sdk_comms.proto @@ -57,4 +57,3 @@ message ReadTablesResponse { rerun.common.v1alpha1.TableId id = 1; rerun.common.v1alpha1.DataframePart data = 2; } - diff --git a/crates/viewer/re_viewer/src/web.rs b/crates/viewer/re_viewer/src/web.rs index 461fda6a8a62..819ab27fb04b 100644 --- a/crates/viewer/re_viewer/src/web.rs +++ b/crates/viewer/re_viewer/src/web.rs @@ -7,9 +7,11 @@ use arrow::{array::RecordBatch, error::ArrowErros}; use serde::Deserialize; use std::rc::Rc; use std::str::FromStr as _; +use std::sync::Arc; use wasm_bindgen::prelude::*; use re_log::ResultExt as _; +use re_log_types::{TableId, TableMsg}; use re_memory::AccountingAllocator; use re_viewer_context::{AsyncRuntimeHandle, SystemCommand, SystemCommandSender as _}; @@ -324,8 +326,7 @@ impl WebHandle { let encoded = &stream_reader .collect::, _>>() .expect("could not read from IPC stream")[0]; - let msg = - re_log_types::TableMsg::from_arrow_encoded(encoded).expect("msg decode failed"); + let msg = from_arrow_encoded(encoded).expect("msg decode failed"); let egui_ctx = app.egui_ctx.clone(); @@ -893,10 +894,10 @@ pub fn set_email(email: String) { /// Returns the [`TableMsg`] encoded as a record batch. // This is required to send bytes to a viewer running in a notebook. // If you ever change this, you also need to adapt `notebook.py` too. -pub fn to_arrow_encoded(&table: TableMsg) -> Result { - let current_schema = self.data.schema(); +pub fn to_arrow_encoded(&table: TableMsg) -> Result { + let current_schema = table.data.schema(); let mut metadata = current_schema.metadata().clone(); - metadata.insert("__table_id".to_owned(), self.id.as_str().to_owned()); + metadata.insert("__table_id".to_owned(), table.id.as_str().to_owned()); // Create a new schema with the updated metadata let new_schema = Arc::new(arrow::datatypes::Schema::new_with_metadata( @@ -905,7 +906,7 @@ pub fn to_arrow_encoded(&table: TableMsg) -> Result Option { let mut metadata = data.schema().metadata().clone(); let id = metadata.remove("__table_id").expect("this has to be here"); - let data = ArrowRecordBatch::try_new( + let data = RecordBatch::try_new( Arc::new(arrow::datatypes::Schema::new_with_metadata( data.schema().fields().clone(), metadata, From c11fa6b91a60d61612aed23f1f441087e5fb0a74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 10:43:33 +0200 Subject: [PATCH 06/19] More fixes --- crates/viewer/re_viewer/src/app.rs | 2 +- crates/viewer/re_viewer/src/web.rs | 29 +++++++++++-------- .../viewer/re_viewer_context/src/store_hub.rs | 2 +- rerun_py/rerun_sdk/rerun/notebook.py | 2 +- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index 599d79466d99..1ade05148e8f 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -1375,7 +1375,7 @@ impl App { fn receive_messages(&self, store_hub: &mut StoreHub, egui_ctx: &egui::Context) { re_tracing::profile_function!(); - // TODO: Should we bring back analytics for this too? + // TODO(grtlr): Should we bring back analytics for this too? self.rx_table.lock().retain(|rx| match rx.try_recv() { Ok(table) => { diff --git a/crates/viewer/re_viewer/src/web.rs b/crates/viewer/re_viewer/src/web.rs index 819ab27fb04b..4c5aba9267a1 100644 --- a/crates/viewer/re_viewer/src/web.rs +++ b/crates/viewer/re_viewer/src/web.rs @@ -3,7 +3,7 @@ #![allow(clippy::mem_forget)] // False positives from #[wasm_bindgen] macro use ahash::HashMap; -use arrow::{array::RecordBatch, error::ArrowErros}; +use arrow::{array::RecordBatch, error::ArrowError}; use serde::Deserialize; use std::rc::Rc; use std::str::FromStr as _; @@ -318,20 +318,25 @@ impl WebHandle { if let Some(channel) = self.tx_channels.get(id) { let tx = channel.table_tx.clone(); - // TODO: error handling let cursor = std::io::Cursor::new(data); - let stream_reader = arrow::ipc::reader::StreamReader::try_new(cursor, None) - .expect("failed to create cursor"); + let stream_reader = arrow::ipc::reader::StreamReader::try_new(cursor, None) else { + re_log::error_once!("Failed to create cursor"); + return; + }; - let encoded = &stream_reader - .collect::, _>>() - .expect("could not read from IPC stream")[0]; - let msg = from_arrow_encoded(encoded).expect("msg decode failed"); + let encoded = &stream_reader.collect::, _>>() else { + re_log::error_once!("Could not read from IPC stream"); + return; + }; + + let msg = from_arrow_encoded(encoded[0]) else { + re_log::error_once!("Failed to decode Arrow message"); + return; + }; let egui_ctx = app.egui_ctx.clone(); if tx.send(msg).is_ok() { - // TODO(jan): Is this enough to request a repaint? egui_ctx.request_repaint_after(std::time::Duration::from_millis(10)); } else { re_log::info_once!("Failed to dispatch log message to viewer."); @@ -894,7 +899,7 @@ pub fn set_email(email: String) { /// Returns the [`TableMsg`] encoded as a record batch. // This is required to send bytes to a viewer running in a notebook. // If you ever change this, you also need to adapt `notebook.py` too. -pub fn to_arrow_encoded(&table: TableMsg) -> Result { +pub fn to_arrow_encoded(table: &TableMsg) -> Result { let current_schema = table.data.schema(); let mut metadata = current_schema.metadata().clone(); metadata.insert("__table_id".to_owned(), table.id.as_str().to_owned()); @@ -912,7 +917,7 @@ pub fn to_arrow_encoded(&table: TableMsg) -> Result { /// Returns the [`TableMsg`] back from a encoded record batch. // This is required to send bytes around in the notebook. // If you ever change this, you also need to adapt `notebook.py` too. -pub fn from_arrow_encoded(data: &ArrowRecordBatch) -> Option { +pub fn from_arrow_encoded(data: &RecordBatch) -> Option { re_log::info!("{:?}", data); let mut metadata = data.schema().metadata().clone(); let id = metadata.remove("__table_id").expect("this has to be here"); @@ -926,7 +931,7 @@ pub fn from_arrow_encoded(data: &ArrowRecordBatch) -> Option { ) .ok()?; - Some(Self { + Some(TableMsg { id: TableId::new(id), data, }) diff --git a/crates/viewer/re_viewer_context/src/store_hub.rs b/crates/viewer/re_viewer_context/src/store_hub.rs index 179b97d23d00..fee30c056810 100644 --- a/crates/viewer/re_viewer_context/src/store_hub.rs +++ b/crates/viewer/re_viewer_context/src/store_hub.rs @@ -396,7 +396,7 @@ impl StoreHub { } /// Remove all open recordings and applications, and go to the welcome page. - // TODO: Make sure we also clear all tables + // TODO(grtlr): Make sure we also clear all tables pub fn clear_recordings(&mut self) { // Keep only the welcome screen: let mut store_ids_retained = HashSet::default(); diff --git a/rerun_py/rerun_sdk/rerun/notebook.py b/rerun_py/rerun_sdk/rerun/notebook.py index a0c5bab7b1a5..91ca5c411ab3 100644 --- a/rerun_py/rerun_sdk/rerun/notebook.py +++ b/rerun_py/rerun_sdk/rerun/notebook.py @@ -41,7 +41,7 @@ from rerun import bindings -from .recording_stream import RecordingStream, get_data_recording, get_global_data_recording +from .recording_stream import RecordingStream, get_data_recording _default_width = 640 _default_height = 480 From 1c94fed3e0f514e2bf22b003b07af3fb841379b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 10:48:16 +0200 Subject: [PATCH 07/19] fmt --- rerun_py/rerun_sdk/rerun/notebook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rerun_py/rerun_sdk/rerun/notebook.py b/rerun_py/rerun_sdk/rerun/notebook.py index 91ca5c411ab3..4cea08ca1ae6 100644 --- a/rerun_py/rerun_sdk/rerun/notebook.py +++ b/rerun_py/rerun_sdk/rerun/notebook.py @@ -8,9 +8,9 @@ from typing import TYPE_CHECKING, Any, Literal import numpy as np -from pyarrow import RecordBatch import pyarrow import pyarrow.ipc as ipc +from pyarrow import RecordBatch from .error_utils import deprecated_param from .time import to_nanos, to_nanos_since_epoch From 6ebc99ca2c1ecc442ff7715c89b760b6b9ce8ea0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 11:20:36 +0200 Subject: [PATCH 08/19] improve comments --- crates/store/re_grpc_server/src/lib.rs | 2 +- crates/store/re_log_types/src/lib.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 7fd19643c9c2..a2f56623f4f3 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -316,7 +316,7 @@ pub fn spawn_with_recv( }), Err(broadcast::error::RecvError::Closed) => { re_log::debug!("message proxy server shut down, closing receiver"); - // We don't have to explicitly call `quit` on crossbeam channels. + // `crossbeam` does not have a `quit` method, so we're done here. break; } Err(broadcast::error::RecvError::Lagged(n)) => { diff --git a/crates/store/re_log_types/src/lib.rs b/crates/store/re_log_types/src/lib.rs index 068f505cda4c..49768523c638 100644 --- a/crates/store/re_log_types/src/lib.rs +++ b/crates/store/re_log_types/src/lib.rs @@ -640,6 +640,10 @@ impl std::fmt::Display for StoreSource { // --- +/// A table, encoded as a dataframe of Arrow record batches. +/// +/// Tables have a [`TableId`], but don't belong to an application and therefore don't have an [`ApplicationId`]. +/// For now, the table is always sent as a whole, i.e. tables can't be streamed. #[must_use] #[derive(Clone, Debug, PartialEq)] pub struct TableMsg { From 4b5d842e48f3686a4a3b0909b9c586712a169b2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 11:36:52 +0200 Subject: [PATCH 09/19] fix viewer compilation --- crates/viewer/re_viewer/src/web.rs | 45 +++++++++++++++--------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/crates/viewer/re_viewer/src/web.rs b/crates/viewer/re_viewer/src/web.rs index 4c5aba9267a1..26f01385675c 100644 --- a/crates/viewer/re_viewer/src/web.rs +++ b/crates/viewer/re_viewer/src/web.rs @@ -3,7 +3,7 @@ #![allow(clippy::mem_forget)] // False positives from #[wasm_bindgen] macro use ahash::HashMap; -use arrow::{array::RecordBatch, error::ArrowError}; +use arrow::array::RecordBatch; use serde::Deserialize; use std::rc::Rc; use std::str::FromStr as _; @@ -319,17 +319,17 @@ impl WebHandle { let tx = channel.table_tx.clone(); let cursor = std::io::Cursor::new(data); - let stream_reader = arrow::ipc::reader::StreamReader::try_new(cursor, None) else { + let Ok(stream_reader) = arrow::ipc::reader::StreamReader::try_new(cursor, None) else { re_log::error_once!("Failed to create cursor"); return; }; - let encoded = &stream_reader.collect::, _>>() else { + let Ok(encoded) = &stream_reader.collect::, _>>() else { re_log::error_once!("Could not read from IPC stream"); return; }; - let msg = from_arrow_encoded(encoded[0]) else { + let Some(msg) = from_arrow_encoded(&encoded[0]) else { re_log::error_once!("Failed to decode Arrow message"); return; }; @@ -896,24 +896,6 @@ pub fn set_email(email: String) { config.save().unwrap(); } -/// Returns the [`TableMsg`] encoded as a record batch. -// This is required to send bytes to a viewer running in a notebook. -// If you ever change this, you also need to adapt `notebook.py` too. -pub fn to_arrow_encoded(table: &TableMsg) -> Result { - let current_schema = table.data.schema(); - let mut metadata = current_schema.metadata().clone(); - metadata.insert("__table_id".to_owned(), table.id.as_str().to_owned()); - - // Create a new schema with the updated metadata - let new_schema = Arc::new(arrow::datatypes::Schema::new_with_metadata( - current_schema.fields().clone(), - metadata, - )); - - // Create a new record batch with the same data but updated schema - RecordBatch::try_new(new_schema, table.data.columns().to_vec()) -} - /// Returns the [`TableMsg`] back from a encoded record batch. // This is required to send bytes around in the notebook. // If you ever change this, you also need to adapt `notebook.py` too. @@ -940,6 +922,25 @@ pub fn from_arrow_encoded(data: &RecordBatch) -> Option { #[cfg(test)] mod tests { use super::*; + use arrow::ArrowError; + + /// Returns the [`TableMsg`] encoded as a record batch. + // This is required to send bytes to a viewer running in a notebook. + // If you ever change this, you also need to adapt `notebook.py` too. + pub fn to_arrow_encoded(table: &TableMsg) -> Result { + let current_schema = table.data.schema(); + let mut metadata = current_schema.metadata().clone(); + metadata.insert("__table_id".to_owned(), table.id.as_str().to_owned()); + + // Create a new schema with the updated metadata + let new_schema = Arc::new(arrow::datatypes::Schema::new_with_metadata( + current_schema.fields().clone(), + metadata, + )); + + // Create a new record batch with the same data but updated schema + RecordBatch::try_new(new_schema, table.data.columns().to_vec()) + } #[test] fn table_msg_encoded_roundtrip() { From 60bb1b6a0b98e92384ae35e5eb4f13634fecf43e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 11:41:52 +0200 Subject: [PATCH 10/19] undo deleted code due to merge --- crates/viewer/re_viewer/src/app.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index 1ade05148e8f..175d7cd71636 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -469,6 +469,18 @@ impl App { #[cfg(not(target_arch = "wasm32"))] let rx = crate::wake_up_ui_thread_on_each_msg(rx, self.egui_ctx.clone()); + // Add unknown redap servers. + // + // Otherwise we end up in a situation where we have a data from an unknown server, + // which is unnecessary and can get us into a strange ui state. + if let SmartChannelSource::RedapGrpcStream(endpoint) = rx.source() { + if !self.state.redap_servers.has_server(&endpoint.origin) { + self.command_sender + .send_system(SystemCommand::AddRedapServer { + endpoint: re_uri::CatalogEndpoint::new(endpoint.origin.clone()), + }); + } + } self.rx_log.add(rx); } From 4d7abf7a069819c822ec2898d9c8d9a8253b42ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 12:23:12 +0200 Subject: [PATCH 11/19] More feedback --- crates/store/re_grpc_server/src/lib.rs | 12 +++---- crates/store/re_log_types/src/lib.rs | 1 + crates/viewer/re_viewer/src/web.rs | 49 ++++++++++++++++---------- rerun_py/rerun_sdk/rerun/notebook.py | 12 +++++++ 4 files changed, 49 insertions(+), 25 deletions(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index a2f56623f4f3..ea0e2ed3281c 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -285,7 +285,7 @@ pub fn spawn_with_recv( break; } Err(broadcast::error::RecvError::Lagged(n)) => { - re_log::debug!( + re_log::warn!( "message proxy receiver dropped {n} messages due to backpressure" ); continue; @@ -320,7 +320,7 @@ pub fn spawn_with_recv( break; } Err(broadcast::error::RecvError::Lagged(n)) => { - re_log::debug!( + re_log::warn!( "message proxy receiver dropped {n} messages due to backpressure" ); continue; @@ -634,13 +634,13 @@ impl MessageProxy { async fn new_client_message_stream(&self) -> ReadMessagesStream { let (sender, receiver) = oneshot::channel(); if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await { - re_log::error!("Error initializing new client: {err}"); + re_log::error!("Error accepting new client: {err}"); return Box::pin(tokio_stream::empty()); }; let (history, log_channel, _) = match receiver.await { Ok(v) => v, Err(err) => { - re_log::error!("Error initializing new client: {err}"); + re_log::error!("Error accepting new client: {err}"); return Box::pin(tokio_stream::empty()); } }; @@ -676,13 +676,13 @@ impl MessageProxy { async fn new_client_table_stream(&self) -> ReadTablesStream { let (sender, receiver) = oneshot::channel(); if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await { - re_log::error!("Error initializing new client: {err}"); + re_log::error!("Error accepting new client: {err}"); return Box::pin(tokio_stream::empty()); }; let (history, _, table_channel) = match receiver.await { Ok(v) => v, Err(err) => { - re_log::error!("Error initializing new client: {err}"); + re_log::error!("Error accepting new client: {err}"); return Box::pin(tokio_stream::empty()); } }; diff --git a/crates/store/re_log_types/src/lib.rs b/crates/store/re_log_types/src/lib.rs index 49768523c638..f4b50b453712 100644 --- a/crates/store/re_log_types/src/lib.rs +++ b/crates/store/re_log_types/src/lib.rs @@ -201,6 +201,7 @@ impl std::fmt::Display for ApplicationId { // ---------------------------------------------------------------------------- +/// Either the user-chosen name of a table, or an id that is created by the catalog server. #[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct TableId(Arc); diff --git a/crates/viewer/re_viewer/src/web.rs b/crates/viewer/re_viewer/src/web.rs index 26f01385675c..cfd81e25ccd9 100644 --- a/crates/viewer/re_viewer/src/web.rs +++ b/crates/viewer/re_viewer/src/web.rs @@ -319,28 +319,38 @@ impl WebHandle { let tx = channel.table_tx.clone(); let cursor = std::io::Cursor::new(data); - let Ok(stream_reader) = arrow::ipc::reader::StreamReader::try_new(cursor, None) else { - re_log::error_once!("Failed to create cursor"); - return; + let stream_reader = match arrow::ipc::reader::StreamReader::try_new(cursor, None) { + Ok(stream_reader) => stream_reader, + Err(err) => { + re_log::error_once!("Failed to interpret data as IPC-encoded arrow: {err}"); + return; + } }; - let Ok(encoded) = &stream_reader.collect::, _>>() else { - re_log::error_once!("Could not read from IPC stream"); - return; + let encoded = match stream_reader.collect::, _>>() { + Ok(encoded) => encoded, + Err(err) => { + re_log::error_once!("Could not read from IPC stream: {err}"); + return; + } }; - let Some(msg) = from_arrow_encoded(&encoded[0]) else { - re_log::error_once!("Failed to decode Arrow message"); - return; + let msg = match from_arrow_encoded(&encoded[0]) { + Ok(msg) => msg, + Err(err) => { + re_log::error_once!("Failed to decode Arrow message: {err}"); + return; + } }; let egui_ctx = app.egui_ctx.clone(); - if tx.send(msg).is_ok() { - egui_ctx.request_repaint_after(std::time::Duration::from_millis(10)); - } else { - re_log::info_once!("Failed to dispatch log message to viewer."); - } + match tx.send(msg) { + Ok(_) => egui_ctx.request_repaint_after(std::time::Duration::from_millis(10)), + Err(err) => { + re_log::info_once!("Failed to dispatch log message to viewer: {err}"); + } + }; } } @@ -899,10 +909,12 @@ pub fn set_email(email: String) { /// Returns the [`TableMsg`] back from a encoded record batch. // This is required to send bytes around in the notebook. // If you ever change this, you also need to adapt `notebook.py` too. -pub fn from_arrow_encoded(data: &RecordBatch) -> Option { +pub fn from_arrow_encoded(data: &RecordBatch) -> Result> { re_log::info!("{:?}", data); let mut metadata = data.schema().metadata().clone(); - let id = metadata.remove("__table_id").expect("this has to be here"); + let id = metadata + .remove("__table_id") + .ok_or("encoded record batch is missing `__table_id` metadata.")?; let data = RecordBatch::try_new( Arc::new(arrow::datatypes::Schema::new_with_metadata( @@ -910,10 +922,9 @@ pub fn from_arrow_encoded(data: &RecordBatch) -> Option { metadata, )), data.columns().to_vec(), - ) - .ok()?; + )?; - Some(TableMsg { + Ok(TableMsg { id: TableId::new(id), data, }) diff --git a/rerun_py/rerun_sdk/rerun/notebook.py b/rerun_py/rerun_sdk/rerun/notebook.py index 4cea08ca1ae6..c4e832e1d8a4 100644 --- a/rerun_py/rerun_sdk/rerun/notebook.py +++ b/rerun_py/rerun_sdk/rerun/notebook.py @@ -228,6 +228,18 @@ def send_table( id: str, table: RecordBatch, ) -> None: + """ + Sends a table in the form of a dataframe to the viewer. + + Parameters + ---------- + id : str + The name that uniquely identifies the table in the viewer. + This name will also be shown in the recording panel. + table : RecordBatch + The table as a single Arrow record batch. + + """ new_table = self._add_table_id(table, id) sink = pyarrow.BufferOutputStream() writer = ipc.new_stream(sink, new_table.schema) From 39fe54d7d51aef81e12a4af88eeb251ea5d33038 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 12:37:24 +0200 Subject: [PATCH 12/19] Improve documentation --- crates/store/re_log_types/src/lib.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/store/re_log_types/src/lib.rs b/crates/store/re_log_types/src/lib.rs index f4b50b453712..f94ec08f394b 100644 --- a/crates/store/re_log_types/src/lib.rs +++ b/crates/store/re_log_types/src/lib.rs @@ -288,6 +288,10 @@ impl BlueprintActivationCommand { } /// The most general log message sent from the SDK to the server. +/// +/// Note: this does not contain tables sent via [`TableMsg`], as these concepts are fundamentally +/// different and should not be handled uniformly. For example, we don't want to store tables in +/// `.rrd` files. #[must_use] #[derive(Clone, Debug, PartialEq)] // `PartialEq` used for tests in another crate #[allow(clippy::large_enum_variant)] @@ -645,6 +649,10 @@ impl std::fmt::Display for StoreSource { /// /// Tables have a [`TableId`], but don't belong to an application and therefore don't have an [`ApplicationId`]. /// For now, the table is always sent as a whole, i.e. tables can't be streamed. +/// +/// It's important to note that tables are not sent via the smart channel of [`LogMsg`], but use a seprate `crossbeam` +/// channel. The reasoning behind this is that tables are fundamentally different from recordings. For example, +/// we don't want to store tables in `.rrd` files, as there are much better formats out there. #[must_use] #[derive(Clone, Debug, PartialEq)] pub struct TableMsg { From c90ff37a3397a8f9f7ec9a02480b42d398ea6d63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 12:44:04 +0200 Subject: [PATCH 13/19] more lints --- crates/store/re_log_types/src/lib.rs | 2 +- rerun_py/rerun_sdk/rerun/notebook.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/store/re_log_types/src/lib.rs b/crates/store/re_log_types/src/lib.rs index f94ec08f394b..cab60357ddcb 100644 --- a/crates/store/re_log_types/src/lib.rs +++ b/crates/store/re_log_types/src/lib.rs @@ -650,7 +650,7 @@ impl std::fmt::Display for StoreSource { /// Tables have a [`TableId`], but don't belong to an application and therefore don't have an [`ApplicationId`]. /// For now, the table is always sent as a whole, i.e. tables can't be streamed. /// -/// It's important to note that tables are not sent via the smart channel of [`LogMsg`], but use a seprate `crossbeam` +/// It's important to note that tables are not sent via the smart channel of [`LogMsg`], but use a separate `crossbeam` /// channel. The reasoning behind this is that tables are fundamentally different from recordings. For example, /// we don't want to store tables in `.rrd` files, as there are much better formats out there. #[must_use] diff --git a/rerun_py/rerun_sdk/rerun/notebook.py b/rerun_py/rerun_sdk/rerun/notebook.py index c4e832e1d8a4..b8499206e77b 100644 --- a/rerun_py/rerun_sdk/rerun/notebook.py +++ b/rerun_py/rerun_sdk/rerun/notebook.py @@ -215,13 +215,13 @@ def add_recording( if blueprint is not None: recording.send_blueprint(blueprint) - def _add_table_id(self, record_batch: RecordBatch, table_id: str): + def _add_table_id(self, record_batch: RecordBatch, table_id: str) -> RecordBatch: # Get current schema schema = record_batch.schema schema = schema.with_metadata({b"__table_id": table_id}) # Create new record batch with updated schema - return pyarrow.RecordBatch.from_arrays(record_batch.columns, schema=schema) + return RecordBatch.from_arrays(record_batch.columns, schema=schema) def send_table( self, From a7c51f728507f46b5697a5c088849773273060c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 12:52:47 +0200 Subject: [PATCH 14/19] destructuring --- crates/viewer/re_viewer/src/web.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/viewer/re_viewer/src/web.rs b/crates/viewer/re_viewer/src/web.rs index cfd81e25ccd9..922f5904f388 100644 --- a/crates/viewer/re_viewer/src/web.rs +++ b/crates/viewer/re_viewer/src/web.rs @@ -247,12 +247,11 @@ impl WebHandle { return; }; - if let Some(channel) = self.tx_channels.remove(id) { - channel - .log_tx + if let Some(Channel { log_tx, table_tx }) = self.tx_channels.remove(id) { + log_tx .quit(None) .warn_on_err_once("Failed to send quit marker"); - drop(channel.table_tx); + drop(table_tx); } // Request a repaint since closing the channel may update the top bar. From 47584bccd98e57cfc1b675b8472a2f36ec8ed164 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 13:12:29 +0200 Subject: [PATCH 15/19] Rename `CloseAllRecordings` to `CloseAllEntries` And actually do that. --- crates/viewer/re_viewer/src/app.rs | 6 +++--- .../re_viewer_context/src/global_context/command_sender.rs | 2 +- crates/viewer/re_viewer_context/src/store_hub.rs | 5 +++-- crates/viewer/re_viewer_context/src/test_context.rs | 2 +- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index 279dc99068d1..97b938615ee4 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -554,8 +554,8 @@ impl App { store_hub.remove(&entry); } - SystemCommand::CloseAllRecordings => { - store_hub.clear_recordings(); + SystemCommand::CloseAllEntries => { + store_hub.clear_entries(); // Stop receiving into the old recordings. // This is most important when going back to the example screen by using the "Back" @@ -866,7 +866,7 @@ impl App { } UICommand::CloseAllRecordings => { self.command_sender - .send_system(SystemCommand::CloseAllRecordings); + .send_system(SystemCommand::CloseAllEntries); } UICommand::Undo => { diff --git a/crates/viewer/re_viewer_context/src/global_context/command_sender.rs b/crates/viewer/re_viewer_context/src/global_context/command_sender.rs index 8e83d6028d28..5c30a54adb4d 100644 --- a/crates/viewer/re_viewer_context/src/global_context/command_sender.rs +++ b/crates/viewer/re_viewer_context/src/global_context/command_sender.rs @@ -62,7 +62,7 @@ pub enum SystemCommand { CloseEntry(StoreHubEntry), /// Close all stores and show the welcome screen again. - CloseAllRecordings, + CloseAllEntries, /// Update the blueprint with additional data /// diff --git a/crates/viewer/re_viewer_context/src/store_hub.rs b/crates/viewer/re_viewer_context/src/store_hub.rs index 546d52e84d0d..5f09dba59944 100644 --- a/crates/viewer/re_viewer_context/src/store_hub.rs +++ b/crates/viewer/re_viewer_context/src/store_hub.rs @@ -396,8 +396,7 @@ impl StoreHub { } /// Remove all open recordings and applications, and go to the welcome page. - // TODO(grtlr): Make sure we also clear all tables - pub fn clear_recordings(&mut self) { + pub fn clear_entries(&mut self) { // Keep only the welcome screen: let mut store_ids_retained = HashSet::default(); self.store_bundle.retain(|db| { @@ -411,6 +410,8 @@ impl StoreHub { self.caches_per_recording .retain(|store_id, _| store_ids_retained.contains(store_id)); + self.table_stores.clear(); + self.active_entry = None; self.active_application_id = Some(Self::welcome_screen_app_id()); } diff --git a/crates/viewer/re_viewer_context/src/test_context.rs b/crates/viewer/re_viewer_context/src/test_context.rs index 9e79d75355fb..293799ff90dc 100644 --- a/crates/viewer/re_viewer_context/src/test_context.rs +++ b/crates/viewer/re_viewer_context/src/test_context.rs @@ -489,7 +489,7 @@ impl TestContext { | SystemCommand::AddRedapServer { .. } | SystemCommand::UndoBlueprint { .. } | SystemCommand::RedoBlueprint { .. } - | SystemCommand::CloseAllRecordings + | SystemCommand::CloseAllEntries | SystemCommand::SetLoopSelection { .. } => handled = false, #[cfg(debug_assertions)] From 1d8afce6970a62cc1de4c4023b93fb2c49e6fc18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 13:22:41 +0200 Subject: [PATCH 16/19] Use `re_size_bytes::SizeBytes` --- crates/store/re_grpc_server/src/lib.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index ea0e2ed3281c..5ca77e98ad9a 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -6,6 +6,7 @@ use std::collections::VecDeque; use std::net::SocketAddr; use std::pin::Pin; +use re_byte_size::SizeBytes; use re_log_encoding::codec::wire::decoder::Decode as _; use re_log_types::TableMsg; use re_protos::sdk_comms::v1alpha1::ReadTablesRequest; @@ -24,7 +25,6 @@ use tonic::transport::server::TcpIncoming; use tonic::transport::Server; use tower_http::cors::CorsLayer; -use re_byte_size::SizeBytes as _; use re_memory::MemoryLimit; use re_protos::{ common::v1alpha1::{ @@ -377,8 +377,8 @@ enum Msg { impl Msg { fn total_size_bytes(&self) -> u64 { match self { - Self::LogMsg(log_msg) => message_size(log_msg), - Self::Table(table) => table_size(table), + Self::LogMsg(log_msg) => log_msg.total_size_bytes(), + Self::Table(table) => table.total_size_bytes(), } } } @@ -509,7 +509,7 @@ impl EventLoop { // Recording data Msg::ArrowMsg(..) => { - let approx_size_bytes = message_size(&msg); + let approx_size_bytes = msg.total_size_bytes(); self.ordered_message_bytes += approx_size_bytes; self.ordered_message_queue.push_back(msg.into()); } @@ -521,7 +521,7 @@ impl EventLoop { self.gc_if_using_too_much_ram(); - let approx_size_bytes = table_size(&table); + let approx_size_bytes = table.total_size_bytes(); self.ordered_message_bytes += approx_size_bytes; self.ordered_message_queue.push_back(Msg::Table(table)); } @@ -570,13 +570,11 @@ impl EventLoop { } } -fn message_size(msg: &LogMsgProto) -> u64 { - msg.total_size_bytes() -} - -fn table_size(table: &TableMsgProto) -> u64 { - let TableMsgProto { id, data } = table; - id.total_size_bytes() + data.total_size_bytes() +impl SizeBytes for TableMsgProto { + fn heap_size_bytes(&self) -> u64 { + let Self { id, data } = self; + id.heap_size_bytes() + data.heap_size_bytes() + } } pub struct MessageProxy { From b6702873efaa50ec2687da8bd7102c27f680c875 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 13:25:28 +0200 Subject: [PATCH 17/19] Missed renaming --- crates/viewer/re_ui/src/command.rs | 6 +++--- crates/viewer/re_viewer/src/app.rs | 2 +- crates/viewer/re_viewer/src/history.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/viewer/re_ui/src/command.rs b/crates/viewer/re_ui/src/command.rs index c84a8c6da365..b1f72a56f571 100644 --- a/crates/viewer/re_ui/src/command.rs +++ b/crates/viewer/re_ui/src/command.rs @@ -20,7 +20,7 @@ pub enum UICommand { SaveRecordingSelection, SaveBlueprint, CloseCurrentRecording, - CloseAllRecordings, + CloseAllEntries, Undo, Redo, @@ -139,7 +139,7 @@ impl UICommand { "Close the current recording (unsaved data will be lost)", ), - Self::CloseAllRecordings => ( + Self::CloseAllEntries => ( "Close all recordings", "Close all open current recording (unsaved data will be lost)", ), @@ -340,7 +340,7 @@ impl UICommand { Self::Open => smallvec![cmd(Key::O)], Self::Import => smallvec![cmd_shift(Key::O)], Self::CloseCurrentRecording => smallvec![], - Self::CloseAllRecordings => smallvec![], + Self::CloseAllEntries => smallvec![], Self::Undo => smallvec![cmd(Key::Z)], Self::Redo => { diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index 97b938615ee4..8bef29dbfc45 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -864,7 +864,7 @@ impl App { .send_system(SystemCommand::CloseEntry(cur_rec.clone().into())); } } - UICommand::CloseAllRecordings => { + UICommand::CloseAllEntries => { self.command_sender .send_system(SystemCommand::CloseAllEntries); } diff --git a/crates/viewer/re_viewer/src/history.rs b/crates/viewer/re_viewer/src/history.rs index 7df91c56e148..3e4291fd0804 100644 --- a/crates/viewer/re_viewer/src/history.rs +++ b/crates/viewer/re_viewer/src/history.rs @@ -152,7 +152,7 @@ fn handle_popstate( // the user navigated back to the history entry where the viewer was initially opened // in that case they likely expect to land back at the welcome screen. // We just close all recordings, which should automatically show the welcome screen or the redap browser. - command_sender.send_system(SystemCommand::CloseAllRecordings); + command_sender.send_system(SystemCommand::CloseAllEntries); set_stored_history_entry(new_state); egui_ctx.request_repaint(); From 4a23b7e056d709e587202674b4d4e0303af33fc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 13:34:09 +0200 Subject: [PATCH 18/19] Update rerun_py/rerun_sdk/rerun/notebook.py Co-authored-by: Emil Ernerfeldt --- rerun_py/rerun_sdk/rerun/notebook.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rerun_py/rerun_sdk/rerun/notebook.py b/rerun_py/rerun_sdk/rerun/notebook.py index b8499206e77b..bdbfdd7af339 100644 --- a/rerun_py/rerun_sdk/rerun/notebook.py +++ b/rerun_py/rerun_sdk/rerun/notebook.py @@ -233,10 +233,10 @@ def send_table( Parameters ---------- - id : str + id: The name that uniquely identifies the table in the viewer. This name will also be shown in the recording panel. - table : RecordBatch + table: The table as a single Arrow record batch. """ From 53b0a82e685cf982d87f04b4ee2eed07591709a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Mon, 7 Apr 2025 13:54:56 +0200 Subject: [PATCH 19/19] fix unused variable --- crates/top/rerun/src/commands/entrypoint.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/top/rerun/src/commands/entrypoint.rs b/crates/top/rerun/src/commands/entrypoint.rs index 44b4d808a680..ba4c61da1d75 100644 --- a/crates/top/rerun/src/commands/entrypoint.rs +++ b/crates/top/rerun/src/commands/entrypoint.rs @@ -841,7 +841,7 @@ fn run_impl( } if !cfg!(feature = "server") { - _ = (call_source, rxs_log); + _ = (call_source, rxs_log, rxs_table); anyhow::bail!("Can't host server - rerun was not compiled with the 'server' feature"); }