2
2
3
3
use std:: {
4
4
collections:: BTreeMap ,
5
- io :: Write ,
5
+ process :: ChildStdin ,
6
6
sync:: { atomic:: AtomicBool , Arc } ,
7
7
} ;
8
8
@@ -12,6 +12,7 @@ use ffmpeg_sidecar::{
12
12
command:: FfmpegCommand ,
13
13
event:: { FfmpegEvent , LogLevel } ,
14
14
} ;
15
+ use parking_lot:: Mutex ;
15
16
16
17
use crate :: Time ;
17
18
@@ -87,6 +88,36 @@ enum FfmpegFrameData {
87
88
EndOfStream ,
88
89
}
89
90
91
+ /// Wraps an stdin with a shared shutdown boolean.
92
+ struct StdinWithShutdown {
93
+ shutdown : Arc < AtomicBool > ,
94
+ stdin : ChildStdin ,
95
+ }
96
+
97
+ impl StdinWithShutdown {
98
+ // Don't use `std::io::ErrorKind::Interrupted` because it has special meaning for default implementations of the `Write` trait,
99
+ // causing it to continue.
100
+ const SHUTDOWN_ERROR_KIND : std:: io:: ErrorKind = std:: io:: ErrorKind :: Other ;
101
+ }
102
+
103
+ impl std:: io:: Write for StdinWithShutdown {
104
+ fn write ( & mut self , buf : & [ u8 ] ) -> std:: io:: Result < usize > {
105
+ if self . shutdown . load ( std:: sync:: atomic:: Ordering :: Acquire ) {
106
+ Err ( std:: io:: Error :: new ( Self :: SHUTDOWN_ERROR_KIND , "shutdown" ) )
107
+ } else {
108
+ self . stdin . write ( buf)
109
+ }
110
+ }
111
+
112
+ fn flush ( & mut self ) -> std:: io:: Result < ( ) > {
113
+ if self . shutdown . load ( std:: sync:: atomic:: Ordering :: Acquire ) {
114
+ Err ( std:: io:: Error :: new ( Self :: SHUTDOWN_ERROR_KIND , "shutdown" ) )
115
+ } else {
116
+ self . stdin . flush ( )
117
+ }
118
+ }
119
+ }
120
+
90
121
struct FfmpegProcessAndListener {
91
122
ffmpeg : FfmpegChild ,
92
123
@@ -100,7 +131,10 @@ struct FfmpegProcessAndListener {
100
131
write_thread : Option < std:: thread:: JoinHandle < ( ) > > ,
101
132
102
133
/// If true, the write thread will not report errors. Used upon exit, so the write thread won't log spam on the hung up stdin.
103
- suppress_write_error_reports : Arc < AtomicBool > ,
134
+ stdin_shutdown : Arc < AtomicBool > ,
135
+
136
+ /// On output instance used by the threads.
137
+ on_output : Arc < Mutex < Option < Arc < OutputCallback > > > > ,
104
138
}
105
139
106
140
impl FfmpegProcessAndListener {
@@ -151,6 +185,12 @@ impl FfmpegProcessAndListener {
151
185
let ( frame_info_tx, frame_info_rx) = crossbeam:: channel:: unbounded ( ) ;
152
186
let ( frame_data_tx, frame_data_rx) = crossbeam:: channel:: unbounded ( ) ;
153
187
188
+ let stdin_shutdown = Arc :: new ( AtomicBool :: new ( false ) ) ;
189
+
190
+ // Mutex protect `on_output` so that we can shut down the threads at a defined point in time at which we
191
+ // no longer receive any new frames or errors from this process.
192
+ let on_output = Arc :: new ( Mutex :: new ( Some ( on_output) ) ) ;
193
+
154
194
let listen_thread = std:: thread:: Builder :: new ( )
155
195
. name ( format ! ( "ffmpeg-reader for {debug_name}" ) )
156
196
. spawn ( {
@@ -166,20 +206,21 @@ impl FfmpegProcessAndListener {
166
206
}
167
207
} )
168
208
. expect ( "Failed to spawn ffmpeg listener thread" ) ;
169
-
170
- let suppress_write_error_reports = Arc :: new ( AtomicBool :: new ( false ) ) ;
171
209
let write_thread = std:: thread:: Builder :: new ( )
172
210
. name ( format ! ( "ffmpeg-writer for {debug_name}" ) )
173
211
. spawn ( {
212
+ let on_output = on_output. clone ( ) ;
174
213
let ffmpeg_stdin = ffmpeg. take_stdin ( ) . ok_or ( Error :: NoStdin ) ?;
175
- let suppress_write_error_reports = suppress_write_error_reports. clone ( ) ;
214
+ let mut ffmpeg_stdin = StdinWithShutdown {
215
+ stdin : ffmpeg_stdin,
216
+ shutdown : stdin_shutdown. clone ( ) ,
217
+ } ;
176
218
move || {
177
219
write_ffmpeg_input (
178
- ffmpeg_stdin,
220
+ & mut ffmpeg_stdin,
179
221
& frame_data_rx,
180
222
on_output. as_ref ( ) ,
181
223
& avcc,
182
- & suppress_write_error_reports,
183
224
) ;
184
225
}
185
226
} )
@@ -191,38 +232,65 @@ impl FfmpegProcessAndListener {
191
232
frame_data_tx,
192
233
listen_thread : Some ( listen_thread) ,
193
234
write_thread : Some ( write_thread) ,
194
- suppress_write_error_reports,
235
+ stdin_shutdown,
236
+ on_output,
195
237
} )
196
238
}
197
239
}
198
240
199
241
impl Drop for FfmpegProcessAndListener {
200
242
fn drop ( & mut self ) {
201
243
re_tracing:: profile_function!( ) ;
202
- self . suppress_write_error_reports
203
- . store ( true , std:: sync:: atomic:: Ordering :: Relaxed ) ;
244
+
245
+ // Stop all outputs from being written to - any attempt from here on out will fail and cause thread shutdown.
246
+ // This way, we ensure all ongoing writes are finished and won't get any more on_output callbacks from this process
247
+ // before we take any other action on the shutdown sequence.
248
+ {
249
+ self . on_output . lock ( ) . take ( ) ;
250
+ }
251
+
252
+ // Notify (potentially wake up) the stdin write thread to stop it (it might be sleeping).
204
253
self . frame_data_tx . send ( FfmpegFrameData :: EndOfStream ) . ok ( ) ;
254
+ // Kill stdin for the write thread. This helps cancelling ongoing stream write operations.
255
+ self . stdin_shutdown
256
+ . store ( true , std:: sync:: atomic:: Ordering :: Release ) ;
257
+
258
+ // Kill the ffmpeg process itself.
259
+ // This should wake up the listen thread if it is sleeping, but that may take a while.
205
260
self . ffmpeg . kill ( ) . ok ( ) ;
206
261
207
- if let Some ( write_thread) = self . write_thread . take ( ) {
208
- if write_thread. join ( ) . is_err ( ) {
209
- re_log:: error!( "Failed to join ffmpeg listener thread." ) ;
262
+ // Unfortunately, even with the above measures, it can still happen that the listen threads take occasionally 100ms and more to shut down.
263
+ // (very much depending on the system & OS, typical times may be low with large outliers)
264
+ // It is crucial that the threads come down eventually and rather timely so to avoid leaking resources.
265
+ // However, in order to avoid stalls, we'll let them finish in parallel.
266
+ //
267
+ // Since we disconnected the `on_output` callback from them, they won't influence any new instances.
268
+ if false {
269
+ {
270
+ re_tracing:: profile_scope!( "shutdown write thread" ) ;
271
+ if let Some ( write_thread) = self . write_thread . take ( ) {
272
+ if write_thread. join ( ) . is_err ( ) {
273
+ re_log:: error!( "Failed to join ffmpeg listener thread." ) ;
274
+ }
275
+ }
210
276
}
211
- }
212
- if let Some ( listen_thread) = self . listen_thread . take ( ) {
213
- if listen_thread. join ( ) . is_err ( ) {
214
- re_log:: error!( "Failed to join ffmpeg listener thread." ) ;
277
+ {
278
+ re_tracing:: profile_scope!( "shutdown listen thread" ) ;
279
+ if let Some ( listen_thread) = self . listen_thread . take ( ) {
280
+ if listen_thread. join ( ) . is_err ( ) {
281
+ re_log:: error!( "Failed to join ffmpeg listener thread." ) ;
282
+ }
283
+ }
215
284
}
216
285
}
217
286
}
218
287
}
219
288
220
289
fn write_ffmpeg_input (
221
- mut ffmpeg_stdin : std:: process :: ChildStdin ,
290
+ ffmpeg_stdin : & mut dyn std:: io :: Write ,
222
291
frame_data_rx : & Receiver < FfmpegFrameData > ,
223
- on_output : & OutputCallback ,
292
+ on_output : & Mutex < Option < Arc < OutputCallback > > > ,
224
293
avcc : & re_mp4:: Avc1Box ,
225
- suppress_write_error_reports : & AtomicBool ,
226
294
) {
227
295
let mut state = NaluStreamState :: default ( ) ;
228
296
@@ -232,19 +300,18 @@ fn write_ffmpeg_input(
232
300
FfmpegFrameData :: EndOfStream => break ,
233
301
} ;
234
302
235
- if let Err ( err) =
236
- write_avc_chunk_to_nalu_stream ( avcc, & mut ffmpeg_stdin, & chunk, & mut state)
237
- {
238
- let write_error = matches ! ( err, Error :: FailedToWriteToFfmpeg ( _) ) ;
239
- if !write_error
240
- || !suppress_write_error_reports. load ( std:: sync:: atomic:: Ordering :: Relaxed )
241
- {
242
- ( on_output) ( Err ( err. into ( ) ) ) ;
243
- }
303
+ if let Err ( err) = write_avc_chunk_to_nalu_stream ( avcc, ffmpeg_stdin, & chunk, & mut state) {
304
+ let on_output = on_output. lock ( ) ;
305
+ if let Some ( on_output) = on_output. as_ref ( ) {
306
+ let write_error = matches ! ( err, Error :: FailedToWriteToFfmpeg ( _) ) ;
307
+ on_output ( Err ( err. into ( ) ) ) ;
244
308
245
- // This is unlikely to improve! Ffmpeg process likely died.
246
- // By exiting here we hang up on the channel, making future attempts to push into it fail which should cause a reset eventually.
247
- if write_error {
309
+ if write_error {
310
+ // This is unlikely to improve! Ffmpeg process likely died.
311
+ // By exiting here we hang up on the channel, making future attempts to push into it fail which should cause a reset eventually.
312
+ return ;
313
+ }
314
+ } else {
248
315
return ;
249
316
}
250
317
} else {
@@ -257,8 +324,8 @@ fn read_ffmpeg_output(
257
324
debug_name : & str ,
258
325
ffmpeg_iterator : ffmpeg_sidecar:: iter:: FfmpegIterator ,
259
326
frame_info_rx : & Receiver < FfmpegFrameInfo > ,
260
- on_output : & OutputCallback ,
261
- ) {
327
+ on_output : & Mutex < Option < Arc < OutputCallback > > > ,
328
+ ) -> Option < ( ) > {
262
329
/// Ignore some common output from ffmpeg:
263
330
fn should_ignore_log_msg ( msg : & str ) -> bool {
264
331
let patterns = [
@@ -310,19 +377,18 @@ fn read_ffmpeg_output(
310
377
}
311
378
312
379
FfmpegEvent :: Log ( LogLevel :: Error , msg) => {
313
- on_output ( Err ( Error :: Ffmpeg ( msg) . into ( ) ) ) ;
380
+ ( on_output. lock ( ) . as_ref ( ) ? ) ( Err ( Error :: Ffmpeg ( msg) . into ( ) ) ) ;
314
381
}
315
382
316
383
FfmpegEvent :: Log ( LogLevel :: Fatal , msg) => {
317
- on_output ( Err ( Error :: FfmpegFatal ( msg) . into ( ) ) ) ;
318
- return ;
384
+ ( on_output. lock ( ) . as_ref ( ) ?) ( Err ( Error :: FfmpegFatal ( msg) . into ( ) ) ) ;
319
385
}
320
386
321
387
FfmpegEvent :: Log ( LogLevel :: Unknown , msg) => {
322
388
if msg. contains ( "system signals, hard exiting" ) {
323
389
// That was probably us, killing the process.
324
390
re_log:: debug!( "FFmpeg process for {debug_name} was killed" ) ;
325
- return ;
391
+ return None ;
326
392
}
327
393
if !should_ignore_log_msg ( & msg) {
328
394
re_log:: warn_once!( "{debug_name} decoder: {msg}" ) ;
@@ -336,7 +402,7 @@ fn read_ffmpeg_output(
336
402
337
403
FfmpegEvent :: Error ( error) => {
338
404
// An error in ffmpeg sidecar itself, rather than ffmpeg.
339
- on_output ( Err ( Error :: FfmpegSidecar ( error) . into ( ) ) ) ;
405
+ ( on_output. lock ( ) . as_ref ( ) ? ) ( Err ( Error :: FfmpegSidecar ( error) . into ( ) ) ) ;
340
406
}
341
407
342
408
FfmpegEvent :: ParsedInput ( input) => {
@@ -423,7 +489,7 @@ fn read_ffmpeg_output(
423
489
re_log:: debug!(
424
490
"{debug_name} ffmpeg decoder frame info channel disconnected"
425
491
) ;
426
- return ;
492
+ return None ;
427
493
} ;
428
494
429
495
// If the decodetimestamp did not increase, we're probably seeking backwards!
@@ -458,7 +524,7 @@ fn read_ffmpeg_output(
458
524
debug_assert_eq ! ( pix_fmt, "rgb24" ) ;
459
525
debug_assert_eq ! ( width as usize * height as usize * 3 , data. len( ) ) ;
460
526
461
- on_output ( Ok ( super :: Frame {
527
+ ( on_output. lock ( ) . as_ref ( ) ? ) ( Ok ( super :: Frame {
462
528
content : super :: FrameContent {
463
529
data,
464
530
width,
@@ -476,7 +542,7 @@ fn read_ffmpeg_output(
476
542
FfmpegEvent :: Done => {
477
543
// This happens on `pkill ffmpeg`, for instance.
478
544
re_log:: debug!( "{debug_name}'s ffmpeg is Done" ) ;
479
- return ;
545
+ return None ;
480
546
}
481
547
482
548
FfmpegEvent :: ParsedVersion ( ffmpeg_version) => {
@@ -497,11 +563,13 @@ fn read_ffmpeg_output(
497
563
FfmpegEvent :: OutputChunk ( _) => {
498
564
// Something went seriously wrong if we end up here.
499
565
re_log:: error!( "Unexpected ffmpeg output chunk for {debug_name}" ) ;
500
- on_output ( Err ( Error :: UnexpectedFfmpegOutputChunk . into ( ) ) ) ;
501
- return ;
566
+ ( on_output. lock ( ) . as_ref ( ) ? ) ( Err ( Error :: UnexpectedFfmpegOutputChunk . into ( ) ) ) ;
567
+ return None ;
502
568
}
503
569
}
504
570
}
571
+
572
+ Some ( ( ) )
505
573
}
506
574
507
575
/// Decode H.264 video via ffmpeg over CLI
@@ -606,20 +674,12 @@ fn write_avc_chunk_to_nalu_stream(
606
674
// Otherwise the decoder is not able to get the necessary information about how the video stream is encoded.
607
675
if chunk. is_sync && !state. previous_frame_was_idr {
608
676
for sps in & avcc. sequence_parameter_sets {
609
- nalu_stream
610
- . write_all ( NAL_START_CODE )
611
- . map_err ( Error :: FailedToWriteToFfmpeg ) ?;
612
- nalu_stream
613
- . write_all ( & sps. bytes )
614
- . map_err ( Error :: FailedToWriteToFfmpeg ) ?;
677
+ write_bytes ( nalu_stream, NAL_START_CODE ) ?;
678
+ write_bytes ( nalu_stream, & sps. bytes ) ?;
615
679
}
616
680
for pps in & avcc. picture_parameter_sets {
617
- nalu_stream
618
- . write_all ( NAL_START_CODE )
619
- . map_err ( Error :: FailedToWriteToFfmpeg ) ?;
620
- nalu_stream
621
- . write_all ( & pps. bytes )
622
- . map_err ( Error :: FailedToWriteToFfmpeg ) ?;
681
+ write_bytes ( nalu_stream, NAL_START_CODE ) ?;
682
+ write_bytes ( nalu_stream, & pps. bytes ) ?;
623
683
}
624
684
state. previous_frame_was_idr = true ;
625
685
} else {
0 commit comments