Skip to content

Add serve_grpc API #9447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
828624d
add grpc server sink to re_sdk
jprochazk Apr 1, 2025
9516f02
expose serve_grpc from `RecordingStream`
jprochazk Apr 1, 2025
44b5223
add python api
jprochazk Apr 1, 2025
2f4437b
add C/C++ api
jprochazk Apr 1, 2025
8b3b0c1
fix sdk compile
jprochazk Apr 1, 2025
9c61bb6
add `--serve-grpc` CLI option
jprochazk Apr 1, 2025
d9d7a15
hide grpc server behind feature flag
jprochazk Apr 1, 2025
2ef929c
Merge branch 'main' into jan/spawn-headless
jprochazk Apr 1, 2025
bdbe265
use server feature in cpp
jprochazk Apr 1, 2025
79eb24f
fix links
jprochazk Apr 1, 2025
85e0ded
Merge branch 'main' into jan/spawn-headless
jprochazk Apr 1, 2025
c6d9a9e
fix lint
jprochazk Apr 1, 2025
0304de9
fix py lint
jprochazk Apr 1, 2025
e92cee9
fix links again
jprochazk Apr 1, 2025
452259b
fix lints in rerun_c
jprochazk Apr 1, 2025
ec43eaa
add additionally needed windows system library link dependencies
Wumpf Apr 1, 2025
5481ca0
fix python sdk export
jprochazk Apr 2, 2025
243a5dc
update docs
jprochazk Apr 2, 2025
d47dea3
add basic cpp sanity check
jprochazk Apr 2, 2025
c4be7e9
flush before shutdown
jprochazk Apr 2, 2025
cdf7c88
require serve_grpc_opts for setting memory limit
jprochazk Apr 2, 2025
3f5391c
Merge branch 'main' into jan/spawn-headless
jprochazk Apr 2, 2025
256e7ed
update operating mode docs
jprochazk Apr 2, 2025
ec63746
update py index
jprochazk Apr 2, 2025
ddf8b9b
speculative links
jprochazk Apr 2, 2025
b1f440f
improve missing scheme message
jprochazk Apr 2, 2025
5a4cb4a
Merge branch 'main' into jan/spawn-headless
jprochazk Apr 2, 2025
7bdcbc7
improve connect error message
jprochazk Apr 2, 2025
716188d
connection failed message
jprochazk Apr 2, 2025
6907012
Update wording
jprochazk Apr 2, 2025
a293330
use DEFAULT_SERVER_PORT
jprochazk Apr 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl std::error::Error for TonicStatusError {
pub enum StreamError {
/// Native connection error
#[cfg(not(target_arch = "wasm32"))]
#[error(transparent)]
#[error("connection failed: {0}")]
Transport(#[from] tonic::transport::Error),

#[error(transparent)]
Expand Down
7 changes: 6 additions & 1 deletion crates/store/re_grpc_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ async fn serve_impl(
let incoming =
TcpIncoming::from_listener(tcp_listener, true, None).expect("failed to init listener");

re_log::info!("Listening for gRPC connections on {addr}");
let connect_addr = if addr.ip().is_loopback() || addr.ip().is_unspecified() {
format!("rerun+http://127.0.0.1:{}/proxy", addr.port())
} else {
format!("rerun+http://{addr}/proxy")
};
re_log::info!("Listening for gRPC connections on {addr}. Connect by running `rerun --connect {connect_addr}`");
Comment on lines +76 to +81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice


let cors = CorsLayer::very_permissive();
let grpc_web = tonic_web::GrpcWebLayer::new();
Expand Down
2 changes: 2 additions & 0 deletions crates/top/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ web_viewer = [
"dep:webbrowser",
]

server = ["dep:re_smart_channel", "dep:tokio"]


[dependencies]
re_build_info.workspace = true
Expand Down
79 changes: 79 additions & 0 deletions crates/top/re_sdk/src/grpc_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use re_log_types::LogMsg;

/// A [`crate::sink::LogSink`] tied to a hosted Rerun gRPC server.
///
/// The hosted gRPC server may be connected to by any SDK or Viewer.
///
/// All data sent through this sink is immediately redirected to the gRPC server.
pub struct GrpcServerSink {
/// Sender to send messages to the gRPC server.
sender: re_smart_channel::Sender<LogMsg>,

/// The gRPC server thread.
_server_handle: std::thread::JoinHandle<()>,

/// Rerun websocket server.
server_shutdown_signal: re_grpc_server::shutdown::Signal,
}

impl GrpcServerSink {
/// A `bind_ip` of `"0.0.0.0"` is a good default.
pub fn new(
bind_ip: &str,
grpc_port: u16,
server_memory_limit: re_memory::MemoryLimit,
) -> Result<Self, std::net::AddrParseError> {
let (server_shutdown_signal, shutdown) = re_grpc_server::shutdown::shutdown();

let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?;
let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>(
re_smart_channel::SmartMessageSource::MessageProxy {
url: format!("rerun+http://{grpc_server_addr}/proxy"),
},
re_smart_channel::SmartChannelSource::Sdk,
);
let server_handle = std::thread::Builder::new()
.name("message_proxy_server".to_owned())
.spawn(move || {
let mut builder = tokio::runtime::Builder::new_current_thread();
builder.enable_all();
let rt = builder.build().expect("failed to build tokio runtime");

rt.block_on(re_grpc_server::serve_from_channel(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this work even if the user already has #[tokio::main] on their fn main?

Copy link
Member Author

@jprochazk jprochazk Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we're spawning a new thread and using a single-threaded runtime, so anything goes.

grpc_server_addr,
server_memory_limit,
shutdown,
channel_rx,
));
})
.expect("failed to spawn thread for message proxy server");

Ok(Self {
sender: channel_tx,
_server_handle: server_handle,
server_shutdown_signal,
})
}
}

impl crate::sink::LogSink for GrpcServerSink {
fn send(&self, msg: LogMsg) {
if let Err(err) = self.sender.send(msg) {
re_log::error_once!("Failed to send log message to gRPC server: {err}");
}
}

#[inline]
fn flush_blocking(&self) {
if let Err(err) = self.sender.flush_blocking() {
re_log::error_once!("Failed to flush: {err}");
}
}
}

impl Drop for GrpcServerSink {
fn drop(&mut self) {
self.sender.flush_blocking().ok();
self.server_shutdown_signal.stop();
}
}
4 changes: 4 additions & 0 deletions crates/top/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ pub use re_data_loader::{DataLoader, DataLoaderError, DataLoaderSettings, Loaded
#[cfg(feature = "web_viewer")]
pub mod web_viewer;

/// Method for spawning a gRPC server and streaming the SDK log stream to it.
#[cfg(feature = "server")]
pub mod grpc_server;

/// Re-exports of other crates.
pub mod external {
pub use re_grpc_client;
Expand Down
104 changes: 104 additions & 0 deletions crates/top/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ pub enum RecordingStreamError {
/// Invalid endpoint
#[error("not a `/proxy` endpoint")]
NotAProxyEndpoint,

/// Invalid bind IP.
#[error(transparent)]
InvalidAddress(#[from] std::net::AddrParseError),
}

/// Results that can occur when creating/manipulating a [`RecordingStream`].
Expand Down Expand Up @@ -401,6 +405,63 @@ impl RecordingStreamBuilder {
}
}

#[cfg(feature = "server")]
/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
/// locally hosted gRPC server.
///
/// The server is hosted on the default IP and port, and may be connected to by any SDK or Viewer
/// at `rerun+http://127.0.0.1:9876/proxy`.
///
/// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
///
/// The gRPC server will buffer in memory so that late connecting viewers will still get all the data.
/// You can limit the amount of data buffered by the gRPC server using [`Self::serve_grpc_opts`],
/// with the `server_memory_limit` argument. Once the memory limit is reached, the earliest logged data
/// will be dropped. Static data is never dropped.
pub fn serve_grpc(self) -> RecordingStreamResult<RecordingStream> {
self.serve_grpc_opts(
"0.0.0.0",
crate::DEFAULT_SERVER_PORT,
re_memory::MemoryLimit::from_fraction_of_total(0.75),
)
}

#[cfg(feature = "server")]
/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
/// locally hosted gRPC server.
///
/// The server is hosted on the given `bind_ip` and `port`, may be connected to by any SDK or Viewer
/// at `rerun+http://{bind_ip}:{port}/proxy`.
///
/// `0.0.0.0` is a good default for `bind_ip`.
///
/// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
/// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
/// Once reached, the earliest logged data will be dropped. Static data is never dropped.
pub fn serve_grpc_opts(
self,
bind_ip: impl AsRef<str>,
port: u16,
server_memory_limit: re_memory::MemoryLimit,
) -> RecordingStreamResult<RecordingStream> {
let (enabled, store_info, properties, batcher_config) = self.into_args();
if enabled {
RecordingStream::new(
store_info,
properties,
batcher_config,
Box::new(crate::grpc_server::GrpcServerSink::new(
bind_ip.as_ref(),
port,
server_memory_limit,
)?),
)
} else {
re_log::debug!("Rerun disabled - call to serve_grpc() ignored");
Ok(RecordingStream::disabled())
}
}

/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to an
/// RRD file on disk.
///
Expand Down Expand Up @@ -1819,6 +1880,49 @@ impl RecordingStream {
Ok(())
}

#[cfg(feature = "server")]
/// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
/// `rerun+http://127.0.0.1:9876/proxy`.
///
/// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
///
/// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
/// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
/// Once reached, the earliest logged data will be dropped. Static data is never dropped.
pub fn serve_grpc(
&self,
server_memory_limit: re_memory::MemoryLimit,
) -> RecordingStreamResult<()> {
self.serve_grpc_opts("0.0.0.0", crate::DEFAULT_SERVER_PORT, server_memory_limit)
}

#[cfg(feature = "server")]
/// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
/// `rerun+http://{bind_ip}:{port}/proxy`.
///
/// `0.0.0.0` is a good default for `bind_ip`.
///
/// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
/// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
/// Once reached, the earliest logged data will be dropped. Static data is never dropped.
pub fn serve_grpc_opts(
&self,
bind_ip: impl AsRef<str>,
port: u16,
server_memory_limit: re_memory::MemoryLimit,
) -> RecordingStreamResult<()> {
if forced_sink_path().is_some() {
re_log::debug!("Ignored setting GrpcServerSink since {ENV_FORCE_SAVE} is set");
return Ok(());
}

let sink =
crate::grpc_server::GrpcServerSink::new(bind_ip.as_ref(), port, server_memory_limit)?;

self.set_sink(Box::new(sink));
Ok(())
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
/// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
/// new process.
Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ run = [
]

## Support for running a gRPC server that listens to incoming log messages from a Rerun SDK.
server = ["dep:re_grpc_server"]
server = ["dep:re_grpc_server", "re_sdk/server", "tokio/signal"]

## Embed the Rerun SDK & built-in types and re-export all of their public symbols.
sdk = ["dep:re_sdk", "dep:re_types"]
Expand Down
64 changes: 56 additions & 8 deletions crates/top/rerun/src/commands/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ Examples:
Open an .rrd file and stream it to a Web Viewer:
rerun recording.rrd --web-viewer

Host a Rerun gRPC server which listens for incoming connections from the logging SDK, buffer the log messages, and serves the results:
Host a Rerun gRPC server which listens for incoming connections from the logging SDK, buffer the log messages, and serve the results:
rerun --serve-web

Host a Rerun Server which serves a recording from a file over gRPC to any connecting Rerun Viewers:
rerun --serve-web recording.rrd

Host a Rerun gRPC server without spawning a Viewer:
rerun --serve-grpc

Spawn a Viewer without also hosting a gRPC server:
rerun --connect

Connect to a Rerun Server:
rerun rerun+http://localhost:9877/proxy

Expand Down Expand Up @@ -158,6 +164,13 @@ When persisted, the state will be stored at the following locations:
#[clap(long)]
serve_web: bool,

/// This will host a gRPC server.
///
/// The server will act like a proxy, listening for incoming connections from
/// logging SDKs, and forwarding it to Rerun viewers.
#[clap(long)]
serve_grpc: bool,

/// Do not attempt to start a new server, instead try to connect to an existing one.
///
/// Optionally accepts an HTTP(S) URL to a gRPC server.
Expand Down Expand Up @@ -638,7 +651,7 @@ fn run_impl(
_build_info: re_build_info::BuildInfo,
call_source: CallSource,
args: Args,
_tokio_runtime_handle: &tokio::runtime::Handle,
tokio_runtime_handle: &tokio::runtime::Handle,
) -> anyhow::Result<()> {
#[cfg(feature = "native_viewer")]
let profiler = run_profiler(&args);
Expand All @@ -660,8 +673,11 @@ fn run_impl(
re_viewer::StartupOptions {
hide_welcome_screen: args.hide_welcome_screen,
detach_process: args.detach_process,
memory_limit: re_memory::MemoryLimit::parse(&args.memory_limit)
.map_err(|err| anyhow::format_err!("Bad --memory-limit: {err}"))?,
memory_limit: {
re_log::debug!("Parsing memory limit for Viewer");
re_memory::MemoryLimit::parse(&args.memory_limit)
.map_err(|err| anyhow::format_err!("Bad --memory-limit: {err}"))?
},
persist_state: args.persist_state,
is_in_notebook: false,
screenshot_to_path_then_quit: args.screenshot_to.clone(),
Expand Down Expand Up @@ -690,8 +706,11 @@ fn run_impl(
#[cfg(feature = "server")]
let server_addr = std::net::SocketAddr::new(args.bind, args.port);
#[cfg(feature = "server")]
let server_memory_limit = re_memory::MemoryLimit::parse(&args.server_memory_limit)
.map_err(|err| anyhow::format_err!("Bad --server-memory-limit: {err}"))?;
let server_memory_limit = {
re_log::debug!("Parsing memory limit for gRPC server");
re_memory::MemoryLimit::parse(&args.server_memory_limit)
.map_err(|err| anyhow::format_err!("Bad --server-memory-limit: {err}"))?
};

#[allow(unused_variables)]
let (command_sender, command_receiver) = re_viewer_context::command_channel();
Expand Down Expand Up @@ -782,7 +801,7 @@ fn run_impl(
// we want all receivers to push their data to the server.
// For that we spawn the server a bit further down, after we've collected
// all receivers into `rxs`.
} else if !args.serve && !args.serve_web {
} else if !args.serve && !args.serve_web && !args.serve_grpc {
let server: Receiver<LogMsg> = re_grpc_server::spawn_with_recv(
server_addr,
server_memory_limit,
Expand Down Expand Up @@ -811,6 +830,35 @@ fn run_impl(

let rx = ReceiveSet::new(rxs);
Ok(stream_to_rrd_on_disk(&rx, &rrd_path.into())?)
} else if args.serve_grpc {
if !catalog_endpoints.is_empty() {
anyhow::bail!("`--serve` does not support catalogs");
}

if !cfg!(feature = "server") {
_ = (call_source, rxs);
anyhow::bail!("Can't host server - rerun was not compiled with the 'server' feature");
}

#[cfg(feature = "server")]
{
let (signal, shutdown) = re_grpc_server::shutdown::shutdown();
// Spawn a server which the Web Viewer can connect to.
// All `rxs` are consumed by the server.
re_grpc_server::spawn_from_rx_set(
server_addr,
server_memory_limit,
shutdown,
ReceiveSet::new(rxs),
);

// Gracefully shut down the server on SIGINT
tokio_runtime_handle.block_on(tokio::signal::ctrl_c()).ok();

signal.stop();
}

Ok(())
} else if args.serve || args.serve_web {
if !catalog_endpoints.is_empty() {
anyhow::bail!("`--serve` does not support catalogs");
Expand Down Expand Up @@ -917,7 +965,7 @@ fn run_impl(
} else {
#[cfg(feature = "native_viewer")]
{
let tokio_runtime_handle = _tokio_runtime_handle.clone();
let tokio_runtime_handle = tokio_runtime_handle.clone();

return re_viewer::run_native_app(
_main_thread_token,
Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun_c/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ test = false
[dependencies]
re_arrow_util.workspace = true
re_log = { workspace = true, features = ["setup"] }
re_sdk = { workspace = true, features = ["data_loaders"] }
re_sdk = { workspace = true, features = ["data_loaders", "server"] }
re_video.workspace = true

ahash.workspace = true
Expand Down
Loading
Loading