1
1
use std:: borrow:: Cow ;
2
2
3
+ use ahash:: { HashMap , HashMapExt } ;
3
4
use re_log_types:: { FileSource , LogMsg } ;
4
5
use re_smart_channel:: Sender ;
5
6
6
- use crate :: { extension , DataLoaderError , LoadedData } ;
7
+ use crate :: { DataLoaderError , LoadedData } ;
7
8
8
9
// ---
9
10
@@ -36,16 +37,7 @@ pub fn load_from_path(
36
37
37
38
let rx = load ( settings, path, None ) ?;
38
39
39
- // TODO(cmc): should we always unconditionally set store info though?
40
- // If we reach this point, then at least one compatible `DataLoader` has been found.
41
- let store_info = prepare_store_info ( & settings. store_id , file_source, path) ;
42
- if let Some ( store_info) = store_info {
43
- if tx. send ( store_info) . is_err ( ) {
44
- return Ok ( ( ) ) ; // other end has hung up.
45
- }
46
- }
47
-
48
- send ( & settings. store_id , rx, tx) ;
40
+ send ( settings. clone ( ) , file_source, path. to_owned ( ) , rx, tx) ;
49
41
50
42
Ok ( ( ) )
51
43
}
@@ -72,16 +64,7 @@ pub fn load_from_file_contents(
72
64
73
65
let data = load ( settings, filepath, Some ( contents) ) ?;
74
66
75
- // TODO(cmc): should we always unconditionally set store info though?
76
- // If we reach this point, then at least one compatible `DataLoader` has been found.
77
- let store_info = prepare_store_info ( & settings. store_id , file_source, filepath) ;
78
- if let Some ( store_info) = store_info {
79
- if tx. send ( store_info) . is_err ( ) {
80
- return Ok ( ( ) ) ; // other end has hung up.
81
- }
82
- }
83
-
84
- send ( & settings. store_id , data, tx) ;
67
+ send ( settings. clone ( ) , file_source, filepath. to_owned ( ) , data, tx) ;
85
68
86
69
Ok ( ( ) )
87
70
}
@@ -93,35 +76,25 @@ pub(crate) fn prepare_store_info(
93
76
store_id : & re_log_types:: StoreId ,
94
77
file_source : FileSource ,
95
78
path : & std:: path:: Path ,
96
- ) -> Option < LogMsg > {
79
+ ) -> LogMsg {
97
80
re_tracing:: profile_function!( path. display( ) . to_string( ) ) ;
98
81
99
82
use re_log_types:: SetStoreInfo ;
100
83
101
84
let app_id = re_log_types:: ApplicationId ( path. display ( ) . to_string ( ) ) ;
102
85
let store_source = re_log_types:: StoreSource :: File { file_source } ;
103
86
104
- let ext = extension ( path) ;
105
- let is_rrd = crate :: SUPPORTED_RERUN_EXTENSIONS . contains ( & ext. as_str ( ) ) ;
106
-
107
- ( !is_rrd) . then ( || {
108
- LogMsg :: SetStoreInfo ( SetStoreInfo {
109
- row_id : * re_chunk:: RowId :: new ( ) ,
110
- info : re_log_types:: StoreInfo {
111
- application_id : app_id. clone ( ) ,
112
- store_id : store_id. clone ( ) ,
113
- cloned_from : None ,
114
- is_official_example : false ,
115
- started : re_log_types:: Time :: now ( ) ,
116
- store_source,
117
- // NOTE: If this is a natively supported file, it will go through one of the
118
- // builtin dataloaders, i.e. the local version.
119
- // Otherwise, it will go through an arbitrary external loader, at which point we
120
- // have no certainty what the version is.
121
- store_version : crate :: is_supported_file_extension ( ext. as_str ( ) )
122
- . then_some ( re_build_info:: CrateVersion :: LOCAL ) ,
123
- } ,
124
- } )
87
+ LogMsg :: SetStoreInfo ( SetStoreInfo {
88
+ row_id : * re_chunk:: RowId :: new ( ) ,
89
+ info : re_log_types:: StoreInfo {
90
+ application_id : app_id. clone ( ) ,
91
+ store_id : store_id. clone ( ) ,
92
+ cloned_from : None ,
93
+ is_official_example : false ,
94
+ started : re_log_types:: Time :: now ( ) ,
95
+ store_source,
96
+ store_version : Some ( re_build_info:: CrateVersion :: LOCAL ) ,
97
+ } ,
125
98
} )
126
99
}
127
100
@@ -288,32 +261,63 @@ pub(crate) fn load(
288
261
///
289
262
/// Runs asynchronously from another thread on native, synchronously on wasm.
290
263
pub ( crate ) fn send (
291
- store_id : & re_log_types:: StoreId ,
264
+ settings : crate :: DataLoaderSettings ,
265
+ file_source : FileSource ,
266
+ path : std:: path:: PathBuf ,
292
267
rx_loader : std:: sync:: mpsc:: Receiver < LoadedData > ,
293
268
tx : & Sender < LogMsg > ,
294
269
) {
295
270
spawn ( {
296
271
re_tracing:: profile_function!( ) ;
297
272
273
+ let mut store_info_tracker: HashMap < re_log_types:: StoreId , bool > = HashMap :: new ( ) ;
274
+
298
275
let tx = tx. clone ( ) ;
299
- let store_id = store_id. clone ( ) ;
300
276
move || {
301
277
// ## Ignoring channel errors
302
278
//
303
279
// Not our problem whether or not the other end has hung up, but we still want to
304
280
// poll the channel in any case so as to make sure that the data producer
305
281
// doesn't get stuck.
306
282
for data in rx_loader {
307
- let msg = match data. into_log_msg ( & store_id) {
308
- Ok ( msg) => msg,
283
+ let msg = match data. into_log_msg ( ) {
284
+ Ok ( msg) => {
285
+ let store_info = match & msg {
286
+ LogMsg :: SetStoreInfo ( set_store_info) => {
287
+ Some ( ( set_store_info. info . store_id . clone ( ) , true ) )
288
+ }
289
+ LogMsg :: ArrowMsg ( store_id, _arrow_msg) => {
290
+ Some ( ( store_id. clone ( ) , false ) )
291
+ }
292
+ LogMsg :: BlueprintActivationCommand ( _) => None ,
293
+ } ;
294
+
295
+ if let Some ( ( store_id, store_info_created) ) = store_info {
296
+ * store_info_tracker. entry ( store_id) . or_default ( ) |= store_info_created;
297
+ }
298
+
299
+ msg
300
+ }
309
301
Err ( err) => {
310
- re_log:: error!( %err, %store_id , "Couldn't serialize component data" ) ;
302
+ re_log:: error!( %err, "Couldn't serialize component data" ) ;
311
303
continue ;
312
304
}
313
305
} ;
314
306
tx. send ( msg) . ok ( ) ;
315
307
}
316
308
309
+ for ( store_id, store_info_already_created) in store_info_tracker {
310
+ let is_a_preexisting_recording =
311
+ Some ( & store_id) == settings. opened_store_id . as_ref ( ) ;
312
+
313
+ if store_info_already_created || is_a_preexisting_recording {
314
+ continue ;
315
+ }
316
+
317
+ let store_info = prepare_store_info ( & store_id, file_source. clone ( ) , & path) ;
318
+ tx. send ( store_info) . ok ( ) ;
319
+ }
320
+
317
321
tx. quit ( None ) . ok ( ) ;
318
322
}
319
323
} ) ;
0 commit comments