Skip to content

Commit 68e8f28

Browse files
jprochazkWumpfemilk
authored
Add serve_grpc API (#9447)
* Closes #9404 This PR adds the `serve_grpc` API to all our SDKs, and the `--serve-grpc` option to the CLI. It can be used to spawn a gRPC server without requiring users to also spawn a Viewer. All of this is gated behind the `server` feature in both the CLI and our SDKs. ## Examples ### Connect a native Viewer to a gRPC server hosted by the SDK Host a gRPC server, log some data to it, and then wait for incoming connections: <details><summary>Python</summary> ```python import time import numpy as np import rerun as rr rr.init("rerun_example_serve_grpc") rr.serve_grpc() SIZE = 10 pos_grid = np.meshgrid(*[np.linspace(-10, 10, SIZE)] * 3) positions = np.vstack([d.reshape(-1) for d in pos_grid]).T col_grid = np.meshgrid(*[np.linspace(0, 255, SIZE)] * 3) colors = np.vstack([c.reshape(-1) for c in col_grid]).astype(np.uint8).T rr.log("my_points", rr.Points3D(positions, colors=colors, radii=0.5)) # We have to explicitly wait here, otherwise the server will shutdown immediately. while True: try: time.sleep(1) except KeyboardInterrupt: break ``` </details> <details><summary>C++</summary> ```cpp #include <rerun.hpp> #include <rerun/demo_utils.hpp> #include <thread> #include <chrono> using rerun::demo::grid3d; int main() { // Create a new `RecordingStream` which sends data over gRPC to the viewer process. const auto rec = rerun::RecordingStream("rerun_example_serve_grpc_cpp"); // Try to spawn a new viewer instance. rec.serve_grpc().exit_on_failure(); // Create some data using the `grid` utility function. std::vector<rerun::Position3D> points = grid3d<rerun::Position3D, float>(-10.f, 10.f, 10); std::vector<rerun::Color> colors = grid3d<rerun::Color, uint8_t>(0, 255, 10); // Log the "my_points" entity with our data, using the `Points3D` archetype. rec.log("my_points", rerun::Points3D(points).with_colors(colors).with_radii({0.5f})); // We have to explicitly wait here, otherwise the server will shutdown immediately. while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } } ``` </details> <details><summary>Rust</summary> ```rust use rerun::{demo_util::grid, external::glam}; fn main() -> Result<(), Box<dyn std::error::Error>> { let rec = rerun::RecordingStreamBuilder::new("rerun_example_serve_grpc_rs") .serve_grpc()?; let points = grid(glam::Vec3::splat(-10.0), glam::Vec3::splat(10.0), 10); let colors = grid(glam::Vec3::ZERO, glam::Vec3::splat(255.0), 10) .map(|v| rerun::Color::from_rgb(v.x as u8, v.y as u8, v.z as u8)); rec.log( "my_points", &rerun::Points3D::new(points) .with_colors(colors) .with_radii([0.5]), )?; // We have to explicitly wait here, otherwise the server will shutdown immediately. loop { std::thread::sleep(std::time::Duration::from_secs(1)); } } ``` </details> Then connect to the server: ``` rerun --connect ``` ### Connect a native Viewer to a gRPC server hosted by the CLI, with data supplied by the Python SDK First, start the gRPC server: ``` rerun --serve-grpc ``` Then, log some data to it: ```python import numpy as np import rerun as rr rr.init("rerun_example_serve_grpc") rr.connect_grpc() SIZE = 10 pos_grid = np.meshgrid(*[np.linspace(-10, 10, SIZE)] * 3) positions = np.vstack([d.reshape(-1) for d in pos_grid]).T col_grid = np.meshgrid(*[np.linspace(0, 255, SIZE)] * 3) colors = np.vstack([c.reshape(-1) for c in col_grid]).astype(np.uint8).T rr.log("my_points", rr.Points3D(positions, colors=colors, radii=0.5)) ``` Finally, connect to the server from a native Viewer: ``` rerun --connect ``` --------- Co-authored-by: Andreas Reich <[email protected]> Co-authored-by: Emil Ernerfeldt <[email protected]>
1 parent 83a9512 commit 68e8f28

File tree

27 files changed

+551
-21
lines changed

27 files changed

+551
-21
lines changed

crates/store/re_grpc_client/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl std::error::Error for TonicStatusError {
4444
pub enum StreamError {
4545
/// Native connection error
4646
#[cfg(not(target_arch = "wasm32"))]
47-
#[error(transparent)]
47+
#[error("connection failed: {0}")]
4848
Transport(#[from] tonic::transport::Error),
4949

5050
#[error(transparent)]

crates/store/re_grpc_server/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,12 @@ async fn serve_impl(
7373
let incoming =
7474
TcpIncoming::from_listener(tcp_listener, true, None).expect("failed to init listener");
7575

76-
re_log::info!("Listening for gRPC connections on {addr}");
76+
let connect_addr = if addr.ip().is_loopback() || addr.ip().is_unspecified() {
77+
format!("rerun+http://127.0.0.1:{}/proxy", addr.port())
78+
} else {
79+
format!("rerun+http://{addr}/proxy")
80+
};
81+
re_log::info!("Listening for gRPC connections on {addr}. Connect by running `rerun --connect {connect_addr}`");
7782

7883
let cors = CorsLayer::very_permissive();
7984
let grpc_web = tonic_web::GrpcWebLayer::new();

crates/top/re_sdk/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ web_viewer = [
4646
"dep:webbrowser",
4747
]
4848

49+
server = ["dep:re_smart_channel", "dep:tokio"]
50+
4951

5052
[dependencies]
5153
re_build_info.workspace = true

crates/top/re_sdk/src/grpc_server.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use re_log_types::LogMsg;
2+
3+
/// A [`crate::sink::LogSink`] tied to a hosted Rerun gRPC server.
4+
///
5+
/// The hosted gRPC server may be connected to by any SDK or Viewer.
6+
///
7+
/// All data sent through this sink is immediately redirected to the gRPC server.
8+
pub struct GrpcServerSink {
9+
/// Sender to send messages to the gRPC server.
10+
sender: re_smart_channel::Sender<LogMsg>,
11+
12+
/// The gRPC server thread.
13+
_server_handle: std::thread::JoinHandle<()>,
14+
15+
/// Rerun websocket server.
16+
server_shutdown_signal: re_grpc_server::shutdown::Signal,
17+
}
18+
19+
impl GrpcServerSink {
20+
/// A `bind_ip` of `"0.0.0.0"` is a good default.
21+
pub fn new(
22+
bind_ip: &str,
23+
grpc_port: u16,
24+
server_memory_limit: re_memory::MemoryLimit,
25+
) -> Result<Self, std::net::AddrParseError> {
26+
let (server_shutdown_signal, shutdown) = re_grpc_server::shutdown::shutdown();
27+
28+
let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?;
29+
let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>(
30+
re_smart_channel::SmartMessageSource::MessageProxy {
31+
url: format!("rerun+http://{grpc_server_addr}/proxy"),
32+
},
33+
re_smart_channel::SmartChannelSource::Sdk,
34+
);
35+
let server_handle = std::thread::Builder::new()
36+
.name("message_proxy_server".to_owned())
37+
.spawn(move || {
38+
let mut builder = tokio::runtime::Builder::new_current_thread();
39+
builder.enable_all();
40+
let rt = builder.build().expect("failed to build tokio runtime");
41+
42+
rt.block_on(re_grpc_server::serve_from_channel(
43+
grpc_server_addr,
44+
server_memory_limit,
45+
shutdown,
46+
channel_rx,
47+
));
48+
})
49+
.expect("failed to spawn thread for message proxy server");
50+
51+
Ok(Self {
52+
sender: channel_tx,
53+
_server_handle: server_handle,
54+
server_shutdown_signal,
55+
})
56+
}
57+
}
58+
59+
impl crate::sink::LogSink for GrpcServerSink {
60+
fn send(&self, msg: LogMsg) {
61+
if let Err(err) = self.sender.send(msg) {
62+
re_log::error_once!("Failed to send log message to gRPC server: {err}");
63+
}
64+
}
65+
66+
#[inline]
67+
fn flush_blocking(&self) {
68+
if let Err(err) = self.sender.flush_blocking() {
69+
re_log::error_once!("Failed to flush: {err}");
70+
}
71+
}
72+
}
73+
74+
impl Drop for GrpcServerSink {
75+
fn drop(&mut self) {
76+
self.sender.flush_blocking().ok();
77+
self.server_shutdown_signal.stop();
78+
}
79+
}

crates/top/re_sdk/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ pub use re_data_loader::{DataLoader, DataLoaderError, DataLoaderSettings, Loaded
123123
#[cfg(feature = "web_viewer")]
124124
pub mod web_viewer;
125125

126+
/// Method for spawning a gRPC server and streaming the SDK log stream to it.
127+
#[cfg(feature = "server")]
128+
pub mod grpc_server;
129+
126130
/// Re-exports of other crates.
127131
pub mod external {
128132
pub use re_grpc_client;

crates/top/re_sdk/src/recording_stream.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ pub enum RecordingStreamError {
9797
/// Invalid endpoint
9898
#[error("not a `/proxy` endpoint")]
9999
NotAProxyEndpoint,
100+
101+
/// Invalid bind IP.
102+
#[error(transparent)]
103+
InvalidAddress(#[from] std::net::AddrParseError),
100104
}
101105

102106
/// Results that can occur when creating/manipulating a [`RecordingStream`].
@@ -400,6 +404,63 @@ impl RecordingStreamBuilder {
400404
}
401405
}
402406

407+
#[cfg(feature = "server")]
408+
/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
409+
/// locally hosted gRPC server.
410+
///
411+
/// The server is hosted on the default IP and port, and may be connected to by any SDK or Viewer
412+
/// at `rerun+http://127.0.0.1:9876/proxy`.
413+
///
414+
/// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
415+
///
416+
/// The gRPC server will buffer in memory so that late connecting viewers will still get all the data.
417+
/// You can limit the amount of data buffered by the gRPC server using [`Self::serve_grpc_opts`],
418+
/// with the `server_memory_limit` argument. Once the memory limit is reached, the earliest logged data
419+
/// will be dropped. Static data is never dropped.
420+
pub fn serve_grpc(self) -> RecordingStreamResult<RecordingStream> {
421+
self.serve_grpc_opts(
422+
"0.0.0.0",
423+
crate::DEFAULT_SERVER_PORT,
424+
re_memory::MemoryLimit::from_fraction_of_total(0.75),
425+
)
426+
}
427+
428+
#[cfg(feature = "server")]
429+
/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
430+
/// locally hosted gRPC server.
431+
///
432+
/// The server is hosted on the given `bind_ip` and `port`, may be connected to by any SDK or Viewer
433+
/// at `rerun+http://{bind_ip}:{port}/proxy`.
434+
///
435+
/// `0.0.0.0` is a good default for `bind_ip`.
436+
///
437+
/// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
438+
/// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
439+
/// Once reached, the earliest logged data will be dropped. Static data is never dropped.
440+
pub fn serve_grpc_opts(
441+
self,
442+
bind_ip: impl AsRef<str>,
443+
port: u16,
444+
server_memory_limit: re_memory::MemoryLimit,
445+
) -> RecordingStreamResult<RecordingStream> {
446+
let (enabled, store_info, properties, batcher_config) = self.into_args();
447+
if enabled {
448+
RecordingStream::new(
449+
store_info,
450+
properties,
451+
batcher_config,
452+
Box::new(crate::grpc_server::GrpcServerSink::new(
453+
bind_ip.as_ref(),
454+
port,
455+
server_memory_limit,
456+
)?),
457+
)
458+
} else {
459+
re_log::debug!("Rerun disabled - call to serve_grpc() ignored");
460+
Ok(RecordingStream::disabled())
461+
}
462+
}
463+
403464
/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to an
404465
/// RRD file on disk.
405466
///
@@ -1818,6 +1879,49 @@ impl RecordingStream {
18181879
Ok(())
18191880
}
18201881

1882+
#[cfg(feature = "server")]
1883+
/// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
1884+
/// `rerun+http://127.0.0.1:9876/proxy`.
1885+
///
1886+
/// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
1887+
///
1888+
/// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
1889+
/// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
1890+
/// Once reached, the earliest logged data will be dropped. Static data is never dropped.
1891+
pub fn serve_grpc(
1892+
&self,
1893+
server_memory_limit: re_memory::MemoryLimit,
1894+
) -> RecordingStreamResult<()> {
1895+
self.serve_grpc_opts("0.0.0.0", crate::DEFAULT_SERVER_PORT, server_memory_limit)
1896+
}
1897+
1898+
#[cfg(feature = "server")]
1899+
/// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
1900+
/// `rerun+http://{bind_ip}:{port}/proxy`.
1901+
///
1902+
/// `0.0.0.0` is a good default for `bind_ip`.
1903+
///
1904+
/// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
1905+
/// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
1906+
/// Once reached, the earliest logged data will be dropped. Static data is never dropped.
1907+
pub fn serve_grpc_opts(
1908+
&self,
1909+
bind_ip: impl AsRef<str>,
1910+
port: u16,
1911+
server_memory_limit: re_memory::MemoryLimit,
1912+
) -> RecordingStreamResult<()> {
1913+
if forced_sink_path().is_some() {
1914+
re_log::debug!("Ignored setting GrpcServerSink since {ENV_FORCE_SAVE} is set");
1915+
return Ok(());
1916+
}
1917+
1918+
let sink =
1919+
crate::grpc_server::GrpcServerSink::new(bind_ip.as_ref(), port, server_memory_limit)?;
1920+
1921+
self.set_sink(Box::new(sink));
1922+
Ok(())
1923+
}
1924+
18211925
/// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
18221926
/// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
18231927
/// new process.

crates/top/rerun/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ run = [
101101
]
102102

103103
## Support for running a gRPC server that listens to incoming log messages from a Rerun SDK.
104-
server = ["dep:re_grpc_server"]
104+
server = ["dep:re_grpc_server", "re_sdk/server", "tokio/signal"]
105105

106106
## Embed the Rerun SDK & built-in types and re-export all of their public symbols.
107107
sdk = ["dep:re_sdk", "dep:re_types"]

crates/top/rerun/src/commands/entrypoint.rs

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,18 @@ Examples:
5757
Open an .rrd file and stream it to a Web Viewer:
5858
rerun recording.rrd --web-viewer
5959
60-
Host a Rerun gRPC server which listens for incoming connections from the logging SDK, buffer the log messages, and serves the results:
60+
Host a Rerun gRPC server which listens for incoming connections from the logging SDK, buffer the log messages, and serve the results:
6161
rerun --serve-web
6262
6363
Host a Rerun Server which serves a recording from a file over gRPC to any connecting Rerun Viewers:
6464
rerun --serve-web recording.rrd
6565
66+
Host a Rerun gRPC server without spawning a Viewer:
67+
rerun --serve-grpc
68+
69+
Spawn a Viewer without also hosting a gRPC server:
70+
rerun --connect
71+
6672
Connect to a Rerun Server:
6773
rerun rerun+http://localhost:9877/proxy
6874
@@ -158,6 +164,13 @@ When persisted, the state will be stored at the following locations:
158164
#[clap(long)]
159165
serve_web: bool,
160166

167+
/// This will host a gRPC server.
168+
///
169+
/// The server will act like a proxy, listening for incoming connections from
170+
/// logging SDKs, and forwarding it to Rerun viewers.
171+
#[clap(long)]
172+
serve_grpc: bool,
173+
161174
/// Do not attempt to start a new server, instead try to connect to an existing one.
162175
///
163176
/// Optionally accepts an HTTP(S) URL to a gRPC server.
@@ -638,7 +651,7 @@ fn run_impl(
638651
_build_info: re_build_info::BuildInfo,
639652
call_source: CallSource,
640653
args: Args,
641-
_tokio_runtime_handle: &tokio::runtime::Handle,
654+
tokio_runtime_handle: &tokio::runtime::Handle,
642655
) -> anyhow::Result<()> {
643656
#[cfg(feature = "native_viewer")]
644657
let profiler = run_profiler(&args);
@@ -660,8 +673,11 @@ fn run_impl(
660673
re_viewer::StartupOptions {
661674
hide_welcome_screen: args.hide_welcome_screen,
662675
detach_process: args.detach_process,
663-
memory_limit: re_memory::MemoryLimit::parse(&args.memory_limit)
664-
.map_err(|err| anyhow::format_err!("Bad --memory-limit: {err}"))?,
676+
memory_limit: {
677+
re_log::debug!("Parsing memory limit for Viewer");
678+
re_memory::MemoryLimit::parse(&args.memory_limit)
679+
.map_err(|err| anyhow::format_err!("Bad --memory-limit: {err}"))?
680+
},
665681
persist_state: args.persist_state,
666682
is_in_notebook: false,
667683
screenshot_to_path_then_quit: args.screenshot_to.clone(),
@@ -690,8 +706,11 @@ fn run_impl(
690706
#[cfg(feature = "server")]
691707
let server_addr = std::net::SocketAddr::new(args.bind, args.port);
692708
#[cfg(feature = "server")]
693-
let server_memory_limit = re_memory::MemoryLimit::parse(&args.server_memory_limit)
694-
.map_err(|err| anyhow::format_err!("Bad --server-memory-limit: {err}"))?;
709+
let server_memory_limit = {
710+
re_log::debug!("Parsing memory limit for gRPC server");
711+
re_memory::MemoryLimit::parse(&args.server_memory_limit)
712+
.map_err(|err| anyhow::format_err!("Bad --server-memory-limit: {err}"))?
713+
};
695714

696715
#[allow(unused_variables)]
697716
let (command_sender, command_receiver) = re_viewer_context::command_channel();
@@ -781,7 +800,7 @@ fn run_impl(
781800
// we want all receivers to push their data to the server.
782801
// For that we spawn the server a bit further down, after we've collected
783802
// all receivers into `rxs`.
784-
} else if !args.serve && !args.serve_web {
803+
} else if !args.serve && !args.serve_web && !args.serve_grpc {
785804
let server: Receiver<LogMsg> = re_grpc_server::spawn_with_recv(
786805
server_addr,
787806
server_memory_limit,
@@ -810,6 +829,35 @@ fn run_impl(
810829

811830
let rx = ReceiveSet::new(rxs);
812831
Ok(stream_to_rrd_on_disk(&rx, &rrd_path.into())?)
832+
} else if args.serve_grpc {
833+
if !catalog_endpoints.is_empty() {
834+
anyhow::bail!("`--serve` does not support catalogs");
835+
}
836+
837+
if !cfg!(feature = "server") {
838+
_ = (call_source, rxs);
839+
anyhow::bail!("Can't host server - rerun was not compiled with the 'server' feature");
840+
}
841+
842+
#[cfg(feature = "server")]
843+
{
844+
let (signal, shutdown) = re_grpc_server::shutdown::shutdown();
845+
// Spawn a server which the Web Viewer can connect to.
846+
// All `rxs` are consumed by the server.
847+
re_grpc_server::spawn_from_rx_set(
848+
server_addr,
849+
server_memory_limit,
850+
shutdown,
851+
ReceiveSet::new(rxs),
852+
);
853+
854+
// Gracefully shut down the server on SIGINT
855+
tokio_runtime_handle.block_on(tokio::signal::ctrl_c()).ok();
856+
857+
signal.stop();
858+
}
859+
860+
Ok(())
813861
} else if args.serve || args.serve_web {
814862
if !catalog_endpoints.is_empty() {
815863
anyhow::bail!("`--serve` does not support catalogs");
@@ -915,7 +963,7 @@ fn run_impl(
915963
} else {
916964
#[cfg(feature = "native_viewer")]
917965
{
918-
let tokio_runtime_handle = _tokio_runtime_handle.clone();
966+
let tokio_runtime_handle = tokio_runtime_handle.clone();
919967

920968
return re_viewer::run_native_app(
921969
_main_thread_token,

crates/top/rerun_c/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ test = false
3737
[dependencies]
3838
re_arrow_util.workspace = true
3939
re_log = { workspace = true, features = ["setup"] }
40-
re_sdk = { workspace = true, features = ["data_loaders"] }
40+
re_sdk = { workspace = true, features = ["data_loaders", "server"] }
4141
re_video.workspace = true
4242

4343
ahash.workspace = true

0 commit comments

Comments
 (0)