@@ -28,13 +28,17 @@ use crate::{server_url, RerunServerError, RerunServerPort};
28
28
struct MessageQueue {
29
29
server_memory_limit : MemoryLimit ,
30
30
messages : VecDeque < Vec < u8 > > ,
31
+
32
+ /// Never garbage collected.
33
+ messages_static : VecDeque < Vec < u8 > > ,
31
34
}
32
35
33
36
impl MessageQueue {
34
37
pub fn new ( server_memory_limit : MemoryLimit ) -> Self {
35
38
Self {
36
39
server_memory_limit,
37
40
messages : Default :: default ( ) ,
41
+ messages_static : Default :: default ( ) ,
38
42
}
39
43
}
40
44
@@ -43,6 +47,15 @@ impl MessageQueue {
43
47
self . messages . push_back ( msg) ;
44
48
}
45
49
50
+ /// Messages pushed using this method will stay around indefinitely.
51
+ ///
52
+ /// Useful e.g. for `SetStoreInfo` messages, so that clients late to the party actually get a
53
+ /// chance of receiving them.
54
+ pub fn push_static ( & mut self , msg : Vec < u8 > ) {
55
+ self . gc_if_using_too_much_ram ( ) ;
56
+ self . messages_static . push_back ( msg) ;
57
+ }
58
+
46
59
fn gc_if_using_too_much_ram ( & mut self ) {
47
60
re_tracing:: profile_function!( ) ;
48
61
@@ -365,7 +378,13 @@ impl ReceiveSetBroadcaster {
365
378
}
366
379
} ) ;
367
380
368
- inner. history . push ( msg) ;
381
+ let msg_is_data = matches ! ( data, LogMsg :: ArrowMsg ( _, _) ) ;
382
+ if msg_is_data {
383
+ inner. history . push ( msg) ;
384
+ } else {
385
+ // Keep non-data commands around for clients late to the party.
386
+ inner. history . push_static ( msg) ;
387
+ }
369
388
}
370
389
371
390
re_smart_channel:: SmartMessagePayload :: Flush { on_flush_done } => {
@@ -395,6 +414,13 @@ impl ReceiveSetBroadcaster {
395
414
// Meaning that if a new one connects, we stall the old connections until we have sent all messages to this one.
396
415
let mut inner = self . inner . lock ( ) ;
397
416
417
+ for msg in & inner. history . messages_static {
418
+ if let Err ( err) = client. send ( tungstenite:: Message :: Binary ( msg. clone ( ) ) ) {
419
+ re_log:: warn!( "Error sending static message to web socket client: {err}" ) ;
420
+ return ;
421
+ }
422
+ }
423
+
398
424
for msg in & inner. history . messages {
399
425
if let Err ( err) = client. send ( tungstenite:: Message :: Binary ( msg. clone ( ) ) ) {
400
426
re_log:: warn!( "Error sending message to web socket client: {err}" ) ;
0 commit comments