Skip to content

Commit 9b07560

Browse files
grtlrabey79
authored andcommitted
Improve and mitigate warnings around dataloss when flushing (#9846)
### Related * Closes #9818. ### What > [!IMPORTANT] > This PR also changes the way `RecordingStream` is free'd in the C/C++ API. Before we called `stream.disconnect`, which unnecessarily replaced the current sink with a _buffered_ sink that would be immediately dropped afterwards. Not only did this cause spam in the log outputs, it also lead to race conditions upon (log) application shutdown. This PR makes it more explicit why we drop data during flushing, by bumping the log messages to `warn!`. It also improves the message by pointing the users to `flush_timeout`. We also bump the default timeout from two seconds to now 3 seconds. It's worth taking note that explicitly calling `flush_blocking` from our SDKs should be able to opt-out of this timeout, to ensure all data is sent. This will be tracked here: * #9845.
1 parent e860f33 commit 9b07560

File tree

3 files changed

+16
-14
lines changed

3 files changed

+16
-14
lines changed

crates/store/re_grpc_client/src/message_proxy/write.rs

+11-12
Original file line numberDiff line numberDiff line change
@@ -88,26 +88,25 @@ impl Client {
8888
};
8989

9090
let start = std::time::Instant::now();
91+
9192
loop {
9293
match rx.try_recv() {
9394
Ok(_) => {
94-
re_log::debug!("Flush complete");
95+
re_log::trace!("Flush complete");
9596
break;
9697
}
9798
Err(TryRecvError::Empty) => {
98-
let Some(timeout) = self.flush_timeout else {
99-
std::thread::yield_now();
100-
continue;
101-
};
102-
103-
let elapsed = start.elapsed();
104-
if elapsed >= timeout {
105-
re_log::debug!("Flush timed out, not all messages were sent");
106-
break;
99+
if let Some(timeout) = self.flush_timeout {
100+
let elapsed = start.elapsed();
101+
if elapsed >= timeout {
102+
re_log::warn!("Flush timed out, not all messages were sent. The timeout can be adjusted when connecting via gRPC.");
103+
break;
104+
}
107105
}
106+
std::thread::yield_now();
108107
}
109108
Err(TryRecvError::Closed) => {
110-
re_log::debug!("Flush failed, not all messages were sent");
109+
re_log::warn!("Flush failed, not all messages were sent");
111110
break;
112111
}
113112
}
@@ -123,7 +122,7 @@ impl Drop for Client {
123122

124123
// Quit immediately - no more messages left in the queue
125124
if let Err(err) = self.shutdown_tx.try_send(()) {
126-
re_log::error!("failed to gracefully shut down message proxy client: {err}");
125+
re_log::error!("Failed to gracefully shut down message proxy client: {err}");
127126
return;
128127
};
129128

crates/top/re_sdk/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub fn default_server_addr() -> std::net::SocketAddr {
5252
#[allow(clippy::unnecessary_wraps)]
5353
pub fn default_flush_timeout() -> Option<std::time::Duration> {
5454
// NOTE: This is part of the SDK and meant to be used where we accept `Option<std::time::Duration>` values.
55-
Some(std::time::Duration::from_secs(2))
55+
Some(std::time::Duration::from_secs(3))
5656
}
5757

5858
pub use re_log_types::{

crates/top/rerun_c/src/lib.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,10 @@ thread_local! {
512512
pub extern "C" fn rr_recording_stream_free(id: CRecordingStream) {
513513
if THREAD_LIFE_TRACKER.try_with(|_v| {}).is_ok() {
514514
if let Some(stream) = RECORDING_STREAMS.lock().remove(id) {
515-
stream.disconnect();
515+
// Before we called `stream.disconnect()` here`, which unnecessarily replaced the current sink with a
516+
// buffered sink that would be immediately dropped afterwards. Not only did this cause spam in the
517+
// log outputs, it also lead to race conditions upon (log) application shutdown.
518+
drop(stream);
516519
}
517520
} else {
518521
// Yes, at least as of writing we can still log things in this state!

0 commit comments

Comments
 (0)