@@ -10,6 +10,8 @@ use std::{
10
10
use bimap:: BiHashMap ;
11
11
use binrw:: prelude:: * ;
12
12
use byteorder:: { WriteBytesExt , LE } ;
13
+ #[ cfg( feature = "zstd" ) ]
14
+ use zstd:: stream:: { raw as zraw, zio} ;
13
15
14
16
use crate :: {
15
17
chunk_sink:: { ChunkMode , ChunkSink } ,
@@ -311,6 +313,7 @@ struct SchemaContent<'a> {
311
313
/// and check for errors when done; otherwise the result will be unwrapped on drop.
312
314
pub struct Writer < W : Write + Seek > {
313
315
writer : Option < WriteMode < W > > ,
316
+ is_finished : bool ,
314
317
chunk_mode : ChunkMode ,
315
318
options : WriteOptions ,
316
319
schemas : BiHashMap < SchemaContent < ' static > , u16 > ,
@@ -363,6 +366,7 @@ impl<W: Write + Seek> Writer<W> {
363
366
364
367
Ok ( Self {
365
368
writer : Some ( WriteMode :: Raw ( writer) ) ,
369
+ is_finished : false ,
366
370
options : opts,
367
371
chunk_mode,
368
372
schemas : Default :: default ( ) ,
@@ -667,10 +671,8 @@ impl<W: Write + Seek> Writer<W> {
667
671
) -> McapResult < ( ) > {
668
672
self . finish_chunk ( ) ?;
669
673
670
- let prev_writer = self . writer . take ( ) . expect ( Self :: WHERE_WRITER ) ;
671
-
672
- let WriteMode :: Raw ( w) = prev_writer else {
673
- panic ! (
674
+ let WriteMode :: Raw ( w) = self . writer . take ( ) . expect ( Self :: WRITER_IS_NONE ) else {
675
+ unreachable ! (
674
676
"since finish_chunk was called, write mode is guaranteed to be raw at this point"
675
677
) ;
676
678
} ;
@@ -786,31 +788,35 @@ impl<W: Write + Seek> Writer<W> {
786
788
Ok ( ( ) )
787
789
}
788
790
789
- /// `.expect()` message when we go to write and self.writer is `None`,
790
- /// which should only happen when [`Writer::finish()`] was called.
791
- const WHERE_WRITER : & ' static str = "Trying to write a record on a finished MCAP" ;
791
+ const WRITER_IS_NONE : & ' static str = "unreachable: self.writer should never be None" ;
792
+
793
+ fn assert_not_finished ( & self ) {
794
+ assert ! (
795
+ !self . is_finished,
796
+ "{}" ,
797
+ "Trying to write a record on a finished MCAP"
798
+ ) ;
799
+ }
792
800
793
801
/// Starts a new chunk if we haven't done so already.
794
802
fn start_chunk ( & mut self ) -> McapResult < & mut ChunkWriter < W > > {
803
+ self . assert_not_finished ( ) ;
804
+
795
805
// It is not possible to start writing a chunk if we're still writing an attachment. Return
796
806
// an error instead.
797
807
if let Some ( WriteMode :: Attachment ( ..) ) = self . writer {
798
808
return Err ( McapError :: AttachmentNotInProgress ) ;
799
809
}
800
810
801
- // Some Rust tricky: we can't move the writer out of self.writer,
802
- // leave that empty for a bit, and then replace it with a ChunkWriter.
803
- // (That would leave it in an unspecified state if we bailed here!)
804
- // Instead briefly swap it out for a null writer while we set up the chunker
805
- // The writer will only be None if finish() was called.
806
811
assert ! (
807
812
self . options. use_chunks,
808
813
"Trying to write to a chunk when chunking is disabled"
809
814
) ;
810
815
811
- let prev_writer = self . writer . take ( ) . expect ( Self :: WHERE_WRITER ) ;
812
-
813
- self . writer = Some ( match prev_writer {
816
+ // Rust forbids moving values out of a &mut reference. We made self.writer an Option so we
817
+ // can work around this by using take() to temporarily replace it with None while we
818
+ // construct the ChunkWriter.
819
+ self . writer = Some ( match self . writer . take ( ) . expect ( Self :: WRITER_IS_NONE ) {
814
820
WriteMode :: Raw ( w) => {
815
821
// It's chunkin time.
816
822
WriteMode :: Chunk ( ChunkWriter :: new (
@@ -832,16 +838,15 @@ impl<W: Write + Seek> Writer<W> {
832
838
833
839
/// Finish the current chunk, if we have one.
834
840
fn finish_chunk ( & mut self ) -> McapResult < & mut CountingCrcWriter < W > > {
841
+ self . assert_not_finished ( ) ;
835
842
// If we're currently writing an attachment then we're not writing a chunk. Return an
836
843
// error instead.
837
844
if let Some ( WriteMode :: Attachment ( ..) ) = self . writer {
838
845
return Err ( McapError :: AttachmentNotInProgress ) ;
839
846
}
840
847
841
- // See above
842
- let prev_writer = self . writer . take ( ) . expect ( Self :: WHERE_WRITER ) ;
843
-
844
- self . writer = Some ( match prev_writer {
848
+ // See start_chunk() for why we use take() here.
849
+ self . writer = Some ( match self . writer . take ( ) . expect ( Self :: WRITER_IS_NONE ) {
845
850
WriteMode :: Chunk ( c) => {
846
851
let ( w, mode, index) = c. finish ( ) ?;
847
852
self . chunk_indexes . push ( index) ;
@@ -862,28 +867,29 @@ impl<W: Write + Seek> Writer<W> {
862
867
///
863
868
/// Subsequent calls to other methods will panic.
864
869
pub fn finish ( & mut self ) -> McapResult < ( ) > {
865
- if self . writer . is_none ( ) {
870
+ if self . is_finished {
866
871
// We already called finish().
867
872
// Maybe we're dropping after the user called it?
868
873
return Ok ( ( ) ) ;
869
874
}
870
875
871
876
// Finish any chunk we were working on and update stats, indexes, etc.
872
877
self . finish_chunk ( ) ?;
878
+ self . is_finished = true ;
873
879
874
880
// Grab the writer - self.writer becoming None makes subsequent writes fail.
875
- let writer = match self . writer . take ( ) {
881
+ let writer = match & mut self . writer {
876
882
// We called finish_chunk() above, so we're back to raw writes for
877
883
// the summary section.
878
884
Some ( WriteMode :: Raw ( w) ) => w,
879
885
_ => unreachable ! ( ) ,
880
886
} ;
881
- let ( mut writer , data_section_crc) = writer. finalize ( ) ;
882
- let data_section_crc = data_section_crc . finalize ( ) ;
887
+ let data_section_crc = writer. current_checksum ( ) ;
888
+ let writer = writer . get_mut ( ) ;
883
889
884
890
// We're done with the data secton!
885
891
write_record (
886
- & mut writer,
892
+ writer,
887
893
& Record :: DataEnd ( records:: DataEnd { data_section_crc } ) ,
888
894
) ?;
889
895
@@ -952,8 +958,8 @@ impl<W: Write + Seek> Writer<W> {
952
958
}
953
959
954
960
// Write all schemas.
955
- let schemas_start = summary_start;
956
961
if self . options . repeat_schemas && !all_schemas. is_empty ( ) {
962
+ let schemas_start: u64 = summary_start;
957
963
for schema in all_schemas. iter ( ) {
958
964
write_record ( & mut ccw, schema) ?;
959
965
}
@@ -1053,14 +1059,30 @@ impl<W: Write + Seek> Writer<W> {
1053
1059
ccw. write_u64 :: < LE > ( summary_start) ?;
1054
1060
ccw. write_u64 :: < LE > ( summary_offset_start) ?;
1055
1061
1056
- let ( mut writer, summary_crc) = ccw. finalize ( ) ;
1062
+ let ( writer, summary_crc) = ccw. finalize ( ) ;
1057
1063
1058
1064
writer. write_u32 :: < LE > ( summary_crc. finalize ( ) ) ?;
1059
1065
1060
1066
writer. write_all ( MAGIC ) ?;
1061
1067
writer. flush ( ) ?;
1062
1068
Ok ( ( ) )
1063
1069
}
1070
+
1071
+ /// Consumes this writer, returning the underlying stream. Unless [`Self::finish()`] was called
1072
+ /// first, the underlying stream __will not contain a complete MCAP.__
1073
+ ///
1074
+ /// Use this if you wish to handle any errors returned when the underlying stream is closed. In
1075
+ /// particular, if using [`std::fs::File`], you may wish to call [`std::fs::File::sync_all()`]
1076
+ /// to ensure all data was sent to the filesystem.
1077
+ pub fn into_inner ( mut self ) -> W {
1078
+ self . is_finished = true ;
1079
+ // Peel away all the layers of the writer to get the underlying stream.
1080
+ match self . writer . take ( ) . expect ( Self :: WRITER_IS_NONE ) {
1081
+ WriteMode :: Raw ( w) => w. finalize ( ) . 0 ,
1082
+ WriteMode :: Attachment ( w) => w. writer . finalize ( ) . 0 . finalize ( ) . 0 ,
1083
+ WriteMode :: Chunk ( w) => w. compressor . finalize ( ) . 0 . into_inner ( ) . finalize ( ) . 0 . inner ,
1084
+ }
1085
+ }
1064
1086
}
1065
1087
1066
1088
impl < W : Write + Seek > Drop for Writer < W > {
@@ -1071,8 +1093,10 @@ impl<W: Write + Seek> Drop for Writer<W> {
1071
1093
1072
1094
enum Compressor < W : Write > {
1073
1095
Null ( W ) ,
1096
+ // zstd's Encoder wrapper doesn't let us get the inner writer without calling finish(), so use
1097
+ // zio::Writer directly instead.
1074
1098
#[ cfg( feature = "zstd" ) ]
1075
- Zstd ( zstd :: Encoder < ' static , W > ) ,
1099
+ Zstd ( zio :: Writer < W , zraw :: Encoder < ' static > > ) ,
1076
1100
#[ cfg( feature = "lz4" ) ]
1077
1101
Lz4 ( lz4:: Encoder < W > ) ,
1078
1102
}
@@ -1082,7 +1106,10 @@ impl<W: Write> Compressor<W> {
1082
1106
Ok ( match self {
1083
1107
Compressor :: Null ( w) => w,
1084
1108
#[ cfg( feature = "zstd" ) ]
1085
- Compressor :: Zstd ( w) => w. finish ( ) ?,
1109
+ Compressor :: Zstd ( mut w) => {
1110
+ w. finish ( ) ?;
1111
+ w. into_inner ( ) . 0
1112
+ }
1086
1113
#[ cfg( feature = "lz4" ) ]
1087
1114
Compressor :: Lz4 ( w) => {
1088
1115
let ( output, result) = w. finish ( ) ;
@@ -1091,6 +1118,16 @@ impl<W: Write> Compressor<W> {
1091
1118
}
1092
1119
} )
1093
1120
}
1121
+
1122
+ fn into_inner ( self ) -> W {
1123
+ match self {
1124
+ Compressor :: Null ( w) => w,
1125
+ #[ cfg( feature = "zstd" ) ]
1126
+ Compressor :: Zstd ( w) => w. into_inner ( ) . 0 ,
1127
+ #[ cfg( feature = "lz4" ) ]
1128
+ Compressor :: Lz4 ( w) => w. finish ( ) . 0 ,
1129
+ }
1130
+ }
1094
1131
}
1095
1132
1096
1133
impl < W : Write > Write for Compressor < W > {
@@ -1178,10 +1215,11 @@ impl<W: Write + Seek> ChunkWriter<W> {
1178
1215
#[ cfg( feature = "zstd" ) ]
1179
1216
Some ( Compression :: Zstd ) => {
1180
1217
#[ allow( unused_mut) ]
1181
- let mut enc = zstd:: Encoder :: new ( sink, 0 ) ?;
1218
+ let mut enc = zraw:: Encoder :: with_dictionary ( 0 , & [ ] ) ?;
1219
+ // Enable multithreaded encoding on non-WASM targets.
1182
1220
#[ cfg( not( target_arch = "wasm32" ) ) ]
1183
- enc. multithread ( num_cpus:: get_physical ( ) as u32 ) ?;
1184
- Compressor :: Zstd ( enc)
1221
+ enc. set_parameter ( zraw :: CParameter :: NbWorkers ( num_cpus:: get_physical ( ) as u32 ) ) ?;
1222
+ Compressor :: Zstd ( zio :: Writer :: new ( sink , enc) )
1185
1223
}
1186
1224
#[ cfg( feature = "lz4" ) ]
1187
1225
Some ( Compression :: Lz4 ) => Compressor :: Lz4 (
@@ -1510,4 +1548,62 @@ mod tests {
1510
1548
} ;
1511
1549
assert ! ( matches!( too_many, McapError :: TooManySchemas ) ) ;
1512
1550
}
1551
+
1552
+ #[ test]
1553
+ #[ should_panic( expected = "Trying to write a record on a finished MCAP" ) ]
1554
+ fn panics_if_write_called_after_finish ( ) {
1555
+ let file = std:: io:: Cursor :: new ( Vec :: new ( ) ) ;
1556
+ let mut writer = Writer :: new ( file) . expect ( "failed to construct writer" ) ;
1557
+ writer. finish ( ) . expect ( "failed to finish writer" ) ;
1558
+
1559
+ let custom_channel = std:: sync:: Arc :: new ( crate :: Channel {
1560
+ id : 1 ,
1561
+ topic : "chat" . into ( ) ,
1562
+ message_encoding : "json" . into ( ) ,
1563
+ metadata : BTreeMap :: new ( ) ,
1564
+ schema : None ,
1565
+ } ) ;
1566
+
1567
+ writer
1568
+ . write ( & crate :: Message {
1569
+ channel : custom_channel. clone ( ) ,
1570
+ sequence : 0 ,
1571
+ log_time : 0 ,
1572
+ publish_time : 0 ,
1573
+ data : Cow :: Owned ( Vec :: new ( ) ) ,
1574
+ } )
1575
+ . expect ( "could not write message" ) ;
1576
+ }
1577
+
1578
+ #[ test]
1579
+ fn writes_message_and_checks_stream_length ( ) {
1580
+ let file = std:: io:: Cursor :: new ( Vec :: new ( ) ) ;
1581
+ let mut writer = Writer :: new ( file) . expect ( "failed to construct writer" ) ;
1582
+
1583
+ let custom_channel = std:: sync:: Arc :: new ( crate :: Channel {
1584
+ id : 1 ,
1585
+ topic : "chat" . into ( ) ,
1586
+ message_encoding : "json" . into ( ) ,
1587
+ metadata : BTreeMap :: new ( ) ,
1588
+ schema : None ,
1589
+ } ) ;
1590
+
1591
+ writer
1592
+ . write ( & crate :: Message {
1593
+ channel : custom_channel. clone ( ) ,
1594
+ sequence : 0 ,
1595
+ log_time : 0 ,
1596
+ publish_time : 0 ,
1597
+ data : Cow :: Owned ( Vec :: new ( ) ) ,
1598
+ } )
1599
+ . expect ( "could not write message" ) ;
1600
+
1601
+ writer. finish ( ) . expect ( "failed to finish writer" ) ;
1602
+
1603
+ let output_len = writer
1604
+ . into_inner ( )
1605
+ . stream_position ( )
1606
+ . expect ( "failed to get stream position" ) ;
1607
+ assert_eq ! ( output_len, 487 ) ;
1608
+ }
1513
1609
}
0 commit comments