@@ -5,6 +5,7 @@ use std::sync::{atomic::AtomicI64, Arc};
5
5
use ahash:: HashMap ;
6
6
use crossbeam:: channel:: { Receiver , Sender } ;
7
7
8
+ use parking_lot:: Mutex ;
8
9
use re_log_types:: {
9
10
ApplicationId , ArrowChunkReleaseCallback , DataCell , DataCellError , DataRow , DataTable ,
10
11
DataTableBatcher , DataTableBatcherConfig , DataTableBatcherError , EntityPath , LogMsg , RowId ,
@@ -61,7 +62,7 @@ pub enum RecordingStreamError {
61
62
#[ error( "Failed to spawn background thread '{name}': {err}" ) ]
62
63
SpawnThread {
63
64
/// Name of the thread
64
- name : & ' static str ,
65
+ name : String ,
65
66
66
67
/// Inner error explaining why the thread failed to spawn.
67
68
err : std:: io:: Error ,
@@ -79,6 +80,11 @@ pub enum RecordingStreamError {
79
80
/// An error that can occur because a row in the store has inconsistent columns.
80
81
#[ error( transparent) ]
81
82
DataReadError ( #[ from] re_log_types:: DataReadError ) ,
83
+
84
+ /// An error occurred while attempting to use a [`re_data_source::DataLoader`].
85
+ #[ cfg( feature = "data_loaders" ) ]
86
+ #[ error( transparent) ]
87
+ DataLoaderError ( #[ from] re_data_source:: DataLoaderError ) ,
82
88
}
83
89
84
90
/// Results that can occur when creating/manipulating a [`RecordingStream`].
@@ -623,6 +629,12 @@ struct RecordingStreamInner {
623
629
batcher : DataTableBatcher ,
624
630
batcher_to_sink_handle : Option < std:: thread:: JoinHandle < ( ) > > ,
625
631
632
+ /// Keeps track of the top-level threads that were spawned in order to execute the `DataLoader`
633
+ /// machinery in the context of this `RecordingStream`.
634
+ ///
635
+ /// See [`RecordingStream::log_file_from_path`] and [`RecordingStream::log_file_from_contents`].
636
+ dataloader_handles : Mutex < Vec < std:: thread:: JoinHandle < ( ) > > > ,
637
+
626
638
pid_at_creation : u32 ,
627
639
}
628
640
@@ -633,6 +645,16 @@ impl Drop for RecordingStreamInner {
633
645
return ;
634
646
}
635
647
648
+ // Run all pending top-level `DataLoader` threads that were started from the SDK to completion.
649
+ //
650
+ // TODO(cmc): At some point we might want to make it configurable, though I cannot really
651
+ // think of a use case where you'd want to drop those threads immediately upon
652
+ // disconnection.
653
+ let dataloader_handles = std:: mem:: take ( & mut * self . dataloader_handles . lock ( ) ) ;
654
+ for handle in dataloader_handles {
655
+ handle. join ( ) . ok ( ) ;
656
+ }
657
+
636
658
// NOTE: The command channel is private, if we're here, nothing is currently capable of
637
659
// sending data down the pipeline.
638
660
self . batcher . flush_blocking ( ) ;
@@ -679,7 +701,10 @@ impl RecordingStreamInner {
679
701
let batcher = batcher. clone ( ) ;
680
702
move || forwarding_thread ( info, sink, cmds_rx, batcher. tables ( ) , on_release)
681
703
} )
682
- . map_err ( |err| RecordingStreamError :: SpawnThread { name : NAME , err } ) ?
704
+ . map_err ( |err| RecordingStreamError :: SpawnThread {
705
+ name : NAME . into ( ) ,
706
+ err,
707
+ } ) ?
683
708
} ;
684
709
685
710
Ok ( RecordingStreamInner {
@@ -688,6 +713,7 @@ impl RecordingStreamInner {
688
713
cmds_tx,
689
714
batcher,
690
715
batcher_to_sink_handle : Some ( batcher_to_sink_handle) ,
716
+ dataloader_handles : Mutex :: new ( Vec :: new ( ) ) ,
691
717
pid_at_creation : std:: process:: id ( ) ,
692
718
} )
693
719
}
@@ -989,6 +1015,103 @@ impl RecordingStream {
989
1015
990
1016
Ok ( ( ) )
991
1017
}
1018
+
1019
+ /// Logs the file at the given `path` using all [`re_data_source::DataLoader`]s available.
1020
+ ///
1021
+ /// A single `path` might be handled by more than one loader.
1022
+ ///
1023
+ /// This method blocks until either at least one [`re_data_source::DataLoader`] starts
1024
+ /// streaming data in or all of them fail.
1025
+ ///
1026
+ /// See <https://www.rerun.io/docs/howto/open-any-file> for more information.
1027
+ #[ cfg( feature = "data_loaders" ) ]
1028
+ pub fn log_file_from_path (
1029
+ & self ,
1030
+ filepath : impl AsRef < std:: path:: Path > ,
1031
+ ) -> RecordingStreamResult < ( ) > {
1032
+ self . log_file ( filepath, None )
1033
+ }
1034
+
1035
+ /// Logs the given `contents` using all [`re_data_source::DataLoader`]s available.
1036
+ ///
1037
+ /// A single `path` might be handled by more than one loader.
1038
+ ///
1039
+ /// This method blocks until either at least one [`re_data_source::DataLoader`] starts
1040
+ /// streaming data in or all of them fail.
1041
+ ///
1042
+ /// See <https://www.rerun.io/docs/howto/open-any-file> for more information.
1043
+ #[ cfg( feature = "data_loaders" ) ]
1044
+ pub fn log_file_from_contents (
1045
+ & self ,
1046
+ filepath : impl AsRef < std:: path:: Path > ,
1047
+ contents : std:: borrow:: Cow < ' _ , [ u8 ] > ,
1048
+ ) -> RecordingStreamResult < ( ) > {
1049
+ self . log_file ( filepath, Some ( contents) )
1050
+ }
1051
+
1052
+ #[ cfg( feature = "data_loaders" ) ]
1053
+ fn log_file (
1054
+ & self ,
1055
+ filepath : impl AsRef < std:: path:: Path > ,
1056
+ contents : Option < std:: borrow:: Cow < ' _ , [ u8 ] > > ,
1057
+ ) -> RecordingStreamResult < ( ) > {
1058
+ let filepath = filepath. as_ref ( ) ;
1059
+ let has_contents = contents. is_some ( ) ;
1060
+
1061
+ let ( tx, rx) = re_smart_channel:: smart_channel (
1062
+ re_smart_channel:: SmartMessageSource :: Sdk ,
1063
+ re_smart_channel:: SmartChannelSource :: File ( filepath. into ( ) ) ,
1064
+ ) ;
1065
+
1066
+ let Some ( store_id) = & self . store_info ( ) . map ( |info| info. store_id . clone ( ) ) else {
1067
+ // There's no recording.
1068
+ return Ok ( ( ) ) ;
1069
+ } ;
1070
+ if let Some ( contents) = contents {
1071
+ re_data_source:: load_from_file_contents (
1072
+ store_id,
1073
+ re_log_types:: FileSource :: Sdk ,
1074
+ filepath,
1075
+ contents,
1076
+ & tx,
1077
+ ) ?;
1078
+ } else {
1079
+ re_data_source:: load_from_path ( store_id, re_log_types:: FileSource :: Sdk , filepath, & tx) ?;
1080
+ }
1081
+ drop ( tx) ;
1082
+
1083
+ // We can safely ignore the error on `recv()` as we're in complete control of both ends of
1084
+ // the channel.
1085
+ let thread_name = if has_contents {
1086
+ format ! ( "log_file_from_contents({filepath:?})" )
1087
+ } else {
1088
+ format ! ( "log_file_from_path({filepath:?})" )
1089
+ } ;
1090
+ let handle = std:: thread:: Builder :: new ( )
1091
+ . name ( thread_name. clone ( ) )
1092
+ . spawn ( {
1093
+ let this = self . clone ( ) ;
1094
+ move || {
1095
+ while let Some ( msg) = rx. recv ( ) . ok ( ) . and_then ( |msg| msg. into_data ( ) ) {
1096
+ this. record_msg ( msg) ;
1097
+ }
1098
+ }
1099
+ } )
1100
+ . map_err ( |err| RecordingStreamError :: SpawnThread {
1101
+ name : thread_name,
1102
+ err,
1103
+ } ) ?;
1104
+
1105
+ debug_assert ! (
1106
+ self . inner. is_some( ) ,
1107
+ "recording should always be fully init at this stage"
1108
+ ) ;
1109
+ if let Some ( inner) = self . inner . as_ref ( ) {
1110
+ inner. dataloader_handles . lock ( ) . push ( handle) ;
1111
+ }
1112
+
1113
+ Ok ( ( ) )
1114
+ }
992
1115
}
993
1116
994
1117
#[ allow( clippy:: needless_pass_by_value) ]
@@ -1450,6 +1573,22 @@ impl RecordingStream {
1450
1573
/// terms of data durability and ordering.
1451
1574
/// See [`Self::set_sink`] for more information.
1452
1575
pub fn disconnect ( & self ) {
1576
+ let Some ( this) = & * self . inner else {
1577
+ re_log:: warn_once!( "Recording disabled - call to disconnect() ignored" ) ;
1578
+ return ;
1579
+ } ;
1580
+
1581
+ // When disconnecting, we need to make sure that pending top-level `DataLoader` threads that
1582
+ // were started from the SDK run to completion.
1583
+ //
1584
+ // TODO(cmc): At some point we might want to make it configurable, though I cannot really
1585
+ // think of a use case where you'd want to drop those threads immediately upon
1586
+ // disconnection.
1587
+ let dataloader_handles = std:: mem:: take ( & mut * this. dataloader_handles . lock ( ) ) ;
1588
+ for handle in dataloader_handles {
1589
+ handle. join ( ) . ok ( ) ;
1590
+ }
1591
+
1453
1592
self . set_sink ( Box :: new ( crate :: sink:: BufferedSink :: new ( ) ) ) ;
1454
1593
}
1455
1594
}
@@ -1465,11 +1604,13 @@ impl fmt::Debug for RecordingStream {
1465
1604
cmds_tx : _,
1466
1605
batcher : _,
1467
1606
batcher_to_sink_handle : _,
1607
+ dataloader_handles,
1468
1608
pid_at_creation,
1469
1609
} ) => f
1470
1610
. debug_struct ( "RecordingStream" )
1471
1611
. field ( "info" , & info)
1472
1612
. field ( "tick" , & tick)
1613
+ . field ( "pending_dataloaders" , & dataloader_handles. lock ( ) . len ( ) )
1473
1614
. field ( "pid_at_creation" , & pid_at_creation)
1474
1615
. finish_non_exhaustive ( ) ,
1475
1616
None => write ! ( f, "RecordingStream {{ disabled }}" ) ,
0 commit comments