@@ -467,89 +467,116 @@ def test_instance_resource_decrease(self):
467
467
# explicit limit of 10 is set.
468
468
self .assertNotIn ("Resource: R1\t Count: 3" , f .read ())
469
469
470
- # Test wait for in-flight sequence completion and block new sequence
470
+ # Test sequence scheduler update
471
471
def test_sequence_instance_update (self ):
472
472
for sequence_batching_type in [
473
- "direct { }\n max_sequence_idle_microseconds: 10000000 " ,
474
- "oldest { max_candidate_sequences: 4 }\n max_sequence_idle_microseconds: 10000000 "
473
+ "direct { }\n max_sequence_idle_microseconds: 16000000 " ,
474
+ "oldest { max_candidate_sequences: 4 }\n max_sequence_idle_microseconds: 16000000 "
475
475
]:
476
476
# Load model
477
477
update_instance_group ("{\n count: 2\n kind: KIND_CPU\n }" )
478
478
update_sequence_batching (sequence_batching_type )
479
479
self .__triton .load_model (self .__model_name )
480
480
self .__check_count ("initialize" , 2 )
481
481
self .__check_count ("finalize" , 0 )
482
- # Basic sequence inference
482
+ # Test infer and update
483
+ self .__test_basic_sequence_infer_and_update ()
484
+ self .__test_concurrent_sequence_infer_and_update ()
485
+ # Unload model
486
+ self .__triton .unload_model (self .__model_name )
487
+ self .__check_count ("initialize" , 4 )
488
+ self .__check_count ("finalize" , 4 , True )
489
+ self .__reset_model ()
490
+
491
+ # Helper function for 'test_sequence_instance_update'
492
+ def __test_basic_sequence_infer_and_update (self ):
493
+ # Basic sequence inference
494
+ self .__triton .infer (self .__model_name ,
495
+ self .__get_inputs (),
496
+ sequence_id = 1 ,
497
+ sequence_start = True )
498
+ self .__triton .infer (self .__model_name ,
499
+ self .__get_inputs (),
500
+ sequence_id = 1 )
501
+ self .__triton .infer (self .__model_name ,
502
+ self .__get_inputs (),
503
+ sequence_id = 1 ,
504
+ sequence_end = True )
505
+ # Update instance without in-flight sequence
506
+ update_instance_group ("{\n count: 3\n kind: KIND_CPU\n }" )
507
+ self .__triton .load_model (self .__model_name )
508
+ self .__check_count ("initialize" , 3 )
509
+ self .__check_count ("finalize" , 0 )
510
+ # Basic sequence inference
511
+ self .__triton .infer (self .__model_name ,
512
+ self .__get_inputs (),
513
+ sequence_id = 1 ,
514
+ sequence_start = True )
515
+ self .__triton .infer (self .__model_name ,
516
+ self .__get_inputs (),
517
+ sequence_id = 1 ,
518
+ sequence_end = True )
519
+
520
+ # Helper function for 'test_sequence_instance_update'
521
+ def __test_concurrent_sequence_infer_and_update (self ):
522
+ # Start seqneuce 1 and 2
523
+ self .__triton .infer (self .__model_name ,
524
+ self .__get_inputs (),
525
+ sequence_id = 1 ,
526
+ sequence_start = True )
527
+ self .__triton .infer (self .__model_name ,
528
+ self .__get_inputs (),
529
+ sequence_id = 2 ,
530
+ sequence_start = True )
531
+ # Check scheduler update will wait for in-flight sequence completion
532
+ update_instance_group (
533
+ "{\n count: 1\n kind: KIND_CPU\n },\n {\n count: 1\n kind: KIND_GPU\n }" )
534
+ update_complete = [False ]
535
+
536
+ def update ():
537
+ self .__triton .load_model (self .__model_name )
538
+ update_complete [0 ] = True
539
+ self .__check_count ("initialize" , 4 )
540
+ self .__check_count ("finalize" , 2 )
541
+
542
+ with concurrent .futures .ThreadPoolExecutor () as pool :
543
+ # Update should wait until sequence 1 and 2 end
544
+ update_thread = pool .submit (update )
545
+ time .sleep (2 ) # make sure the update has started
546
+ self .assertFalse (update_complete [0 ], "Unexpected update completion" )
547
+ # Sequence 1 and 2 may continue to infer during the update
483
548
self .__triton .infer (self .__model_name ,
484
549
self .__get_inputs (),
485
- sequence_id = 1 ,
486
- sequence_start = True )
550
+ sequence_id = 1 )
487
551
self .__triton .infer (self .__model_name ,
488
552
self .__get_inputs (),
489
- sequence_id = 1 )
553
+ sequence_id = 2 )
554
+ time .sleep (4 ) # make sure the infers have started
555
+ self .assertFalse (update_complete [0 ], "Unexpected update completion" )
556
+ # Start sequence 3 should success
490
557
self .__triton .infer (self .__model_name ,
491
558
self .__get_inputs (),
492
- sequence_id = 1 ,
493
- sequence_end = True )
494
- # Update instance
495
- update_instance_group ("{\n count: 4\n kind: KIND_CPU\n }" )
496
- self .__triton .load_model (self .__model_name )
497
- self .__check_count ("initialize" , 4 )
498
- self .__check_count ("finalize" , 0 )
499
- # Start an in-flight sequence
559
+ sequence_id = 3 ,
560
+ sequence_start = True )
561
+ time .sleep (2 ) # make sure the infer has started
562
+ self .assertFalse (update_complete [0 ], "Unexpected update completion" )
563
+ # End sequence 1 and 2 should unblock the update
500
564
self .__triton .infer (self .__model_name ,
501
565
self .__get_inputs (),
502
566
sequence_id = 1 ,
503
- sequence_start = True )
504
- # Check update instance will wait for in-flight sequence completion
505
- # and block new sequence from starting.
506
- update_instance_group ("{\n count: 3\n kind: KIND_CPU\n }" )
507
- update_complete = [False ]
508
- def update ():
509
- self .__triton .load_model (self .__model_name )
510
- update_complete [0 ] = True
511
- self .__check_count ("initialize" , 4 )
512
- self .__check_count ("finalize" , 1 )
513
- infer_complete = [False ]
514
- def infer ():
515
- self .__triton .infer (self .__model_name ,
516
- self .__get_inputs (),
517
- sequence_id = 2 ,
518
- sequence_start = True )
519
- infer_complete [0 ] = True
520
- with concurrent .futures .ThreadPoolExecutor () as pool :
521
- # Update should wait until sequence 1 end
522
- update_thread = pool .submit (update )
523
- time .sleep (2 ) # make sure update has started
524
- self .assertFalse (update_complete [0 ],
525
- "Unexpected update completion" )
526
- # New sequence should wait until update complete
527
- infer_thread = pool .submit (infer )
528
- time .sleep (2 ) # make sure infer has started
529
- self .assertFalse (infer_complete [0 ],
530
- "Unexpected infer completion" )
531
- # End sequence 1 should unblock update
532
- self .__triton .infer (self .__model_name ,
533
- self .__get_inputs (),
534
- sequence_id = 1 ,
535
- sequence_end = True )
536
- time .sleep (2 ) # make sure update has returned
537
- self .assertTrue (update_complete [0 ], "Update possibly stuck" )
538
- update_thread .result ()
539
- # Update completion should unblock new sequence
540
- time .sleep (2 ) # make sure infer has returned
541
- self .assertTrue (infer_complete [0 ], "Infer possibly stuck" )
542
- infer_thread .result ()
543
- # End sequence 2
567
+ sequence_end = True )
544
568
self .__triton .infer (self .__model_name ,
545
569
self .__get_inputs (),
546
570
sequence_id = 2 ,
547
571
sequence_end = True )
548
- # Unload model
549
- self .__triton .unload_model (self .__model_name )
550
- self .__check_count ("initialize" , 4 )
551
- self .__check_count ("finalize" , 4 , True )
552
- self .__reset_model ()
572
+ time .sleep (4 ) # make sure the update has returned
573
+ self .assertTrue (update_complete [0 ], "Update possibly stuck" )
574
+ update_thread .result ()
575
+ # End sequence 3
576
+ self .__triton .infer (self .__model_name ,
577
+ self .__get_inputs (),
578
+ sequence_id = 3 ,
579
+ sequence_end = True )
553
580
554
581
555
582
if __name__ == "__main__" :
0 commit comments