14
14
15
15
use std:: fmt:: { Debug , Formatter } ;
16
16
use std:: future:: Future ;
17
+ use std:: panic:: AssertUnwindSafe ;
17
18
use std:: sync:: Arc ;
18
19
use std:: time:: Duration ;
19
20
20
- use futures:: StreamExt ;
21
+ use futures:: { FutureExt , StreamExt } ;
21
22
use minitrace:: prelude:: * ;
22
23
use parking_lot:: Mutex ;
23
24
use risingwave_common:: array:: DataChunk ;
@@ -370,7 +371,6 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
370
371
371
372
// Clone `self` to make compiler happy because of the move block.
372
373
let t_1 = self . clone ( ) ;
373
- let t_2 = self . clone ( ) ;
374
374
// Spawn task for real execution.
375
375
let fut = async move {
376
376
trace ! ( "Executing plan [{:?}]" , task_id) ;
@@ -394,9 +394,9 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
394
394
395
395
if let Some ( task_metrics) = task_metrics {
396
396
let monitor = TaskMonitor :: new ( ) ;
397
- let join_handle = t_2 . runtime . spawn ( monitor. instrument ( task ( task_id. clone ( ) ) ) ) ;
398
- if let Err ( join_error ) = join_handle . await && join_error . is_panic ( ) {
399
- error ! ( "Batch task {:?} panic! " , task_id) ;
397
+ let instrumented_task = AssertUnwindSafe ( monitor. instrument ( task ( task_id. clone ( ) ) ) ) ;
398
+ if let Err ( error ) = instrumented_task . catch_unwind ( ) . await {
399
+ error ! ( "Batch task {:?} panic: {:?} " , task_id, error ) ;
400
400
}
401
401
let cumulative = monitor. cumulative ( ) ;
402
402
let labels = & task_metrics. task_labels ( ) ;
@@ -425,11 +425,9 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
425
425
. task_slow_poll_duration
426
426
. with_label_values ( labels)
427
427
. set ( cumulative. total_slow_poll_duration . as_secs_f64 ( ) ) ;
428
- } else {
429
- let join_handle = t_2. runtime . spawn ( task ( task_id. clone ( ) ) ) ;
430
- if let Err ( join_error) = join_handle. await && join_error. is_panic ( ) {
431
- error ! ( "Batch task {:?} panic!" , task_id) ;
432
- }
428
+ } else if let Err ( error) = AssertUnwindSafe ( task ( task_id. clone ( ) ) ) . catch_unwind ( ) . await
429
+ {
430
+ error ! ( "Batch task {:?} panic: {:?}" , task_id, error) ;
433
431
}
434
432
} ;
435
433
0 commit comments