-
Notifications
You must be signed in to change notification settings - Fork 451
Add serve_grpc
API
#9447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add serve_grpc
API
#9447
Changes from all commits
828624d
9516f02
44b5223
2f4437b
8b3b0c1
9c61bb6
d9d7a15
2ef929c
bdbe265
79eb24f
85e0ded
c6d9a9e
0304de9
e92cee9
452259b
ec43eaa
5481ca0
243a5dc
d47dea3
c4be7e9
cdf7c88
3f5391c
256e7ed
ec63746
ddf8b9b
b1f440f
5a4cb4a
7bdcbc7
716188d
6907012
a293330
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
use re_log_types::LogMsg; | ||
|
||
/// A [`crate::sink::LogSink`] tied to a hosted Rerun gRPC server. | ||
/// | ||
/// The hosted gRPC server may be connected to by any SDK or Viewer. | ||
/// | ||
/// All data sent through this sink is immediately redirected to the gRPC server. | ||
pub struct GrpcServerSink { | ||
/// Sender to send messages to the gRPC server. | ||
sender: re_smart_channel::Sender<LogMsg>, | ||
|
||
/// The gRPC server thread. | ||
_server_handle: std::thread::JoinHandle<()>, | ||
|
||
/// Rerun websocket server. | ||
server_shutdown_signal: re_grpc_server::shutdown::Signal, | ||
} | ||
|
||
impl GrpcServerSink { | ||
/// A `bind_ip` of `"0.0.0.0"` is a good default. | ||
pub fn new( | ||
bind_ip: &str, | ||
grpc_port: u16, | ||
server_memory_limit: re_memory::MemoryLimit, | ||
) -> Result<Self, std::net::AddrParseError> { | ||
let (server_shutdown_signal, shutdown) = re_grpc_server::shutdown::shutdown(); | ||
|
||
let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?; | ||
let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>( | ||
re_smart_channel::SmartMessageSource::MessageProxy { | ||
url: format!("rerun+http://{grpc_server_addr}/proxy"), | ||
}, | ||
re_smart_channel::SmartChannelSource::Sdk, | ||
); | ||
let server_handle = std::thread::Builder::new() | ||
.name("message_proxy_server".to_owned()) | ||
.spawn(move || { | ||
let mut builder = tokio::runtime::Builder::new_current_thread(); | ||
builder.enable_all(); | ||
let rt = builder.build().expect("failed to build tokio runtime"); | ||
|
||
rt.block_on(re_grpc_server::serve_from_channel( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this work even if the user already has There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we're spawning a new thread and using a single-threaded runtime, so anything goes. |
||
grpc_server_addr, | ||
server_memory_limit, | ||
shutdown, | ||
channel_rx, | ||
)); | ||
}) | ||
.expect("failed to spawn thread for message proxy server"); | ||
|
||
Ok(Self { | ||
sender: channel_tx, | ||
_server_handle: server_handle, | ||
server_shutdown_signal, | ||
}) | ||
} | ||
} | ||
|
||
impl crate::sink::LogSink for GrpcServerSink { | ||
fn send(&self, msg: LogMsg) { | ||
if let Err(err) = self.sender.send(msg) { | ||
re_log::error_once!("Failed to send log message to gRPC server: {err}"); | ||
} | ||
} | ||
|
||
#[inline] | ||
fn flush_blocking(&self) { | ||
if let Err(err) = self.sender.flush_blocking() { | ||
re_log::error_once!("Failed to flush: {err}"); | ||
} | ||
} | ||
} | ||
|
||
impl Drop for GrpcServerSink { | ||
fn drop(&mut self) { | ||
self.sender.flush_blocking().ok(); | ||
self.server_shutdown_signal.stop(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very nice