@@ -49,7 +49,7 @@ import org.apache.openwhisk.core.scheduler.message.{
49
49
SuccessfulCreationJob
50
50
}
51
51
import org .apache .openwhisk .core .scheduler .queue .{MemoryQueueKey , MemoryQueueValue , QueuePool }
52
- import org .apache .openwhisk .core .service .WatchEndpointInserted
52
+ import org .apache .openwhisk .core .service .{ WatchEndpointInserted , WatchEndpointRemoved }
53
53
import org .apache .openwhisk .core .{ConfigKeys , WhiskConfig }
54
54
import org .junit .runner .RunWith
55
55
import org .scalamock .scalatest .MockFactory
@@ -77,6 +77,7 @@ class ContainerManagerTests
77
77
with StreamLogging {
78
78
79
79
val config = new WhiskConfig (ExecManifest .requiredProperties)
80
+ ExecManifest .initialize(config)
80
81
81
82
val testInvocationNamespace = " test-invocation-namespace"
82
83
val testNamespace = " test-namespace"
@@ -278,6 +279,7 @@ class ContainerManagerTests
278
279
)
279
280
expectGetInvokers(mockEtcd, invokers)
280
281
expectGetInvokers(mockEtcd, invokers)
282
+ expectGetInvokers(mockEtcd, invokers)
281
283
expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` twice
282
284
283
285
val mockJobManager = TestProbe ()
@@ -371,10 +373,30 @@ class ContainerManagerTests
371
373
manager ! ContainerCreation (List (msg2), 128 .MB , testInvocationNamespace)
372
374
receiver.expectMsg(s " invoker1- $msg2" )
373
375
374
- // warmed container for action1 become warmed
375
- manager ! SuccessfulCreationJob (msg1.creationId, msg1.invocationNamespace, msg1.action, msg1.revision)
376
+ // warmed container for action1 become warmed when received FailedCreationJob
377
+ manager ! FailedCreationJob (
378
+ msg1.creationId,
379
+ msg1.invocationNamespace,
380
+ msg1.action,
381
+ msg1.revision,
382
+ NoAvailableResourceInvokersError ,
383
+ " " )
376
384
manager ! ContainerCreation (List (msg1), 128 .MB , testInvocationNamespace)
377
385
receiver.expectMsg(s " invoker0- $msg1" )
386
+
387
+ // warmed container for action1 become unwarmed
388
+ manager ! WatchEndpointRemoved (
389
+ ContainerKeys .warmedPrefix,
390
+ ContainerKeys .warmedContainers(
391
+ testInvocationNamespace,
392
+ testfqn,
393
+ testRevision,
394
+ InvokerInstanceId (0 , userMemory = 0 .bytes),
395
+ ContainerId (" fake" )),
396
+ " " ,
397
+ true )
398
+ manager ! ContainerCreation (List (msg1), 128 .MB , testInvocationNamespace)
399
+ receiver.expectMsg(s " invoker2- $msg1" )
378
400
}
379
401
380
402
it should " rescheduling container creation" in {
@@ -627,6 +649,52 @@ class ContainerManagerTests
627
649
" No available invokers with resources List(fake)." ))
628
650
}
629
651
652
+ it should " choose tagged invokers when no invokers available which has no tags first" in {
653
+ val msg =
654
+ ContainerCreationMessage (
655
+ TransactionId .testing,
656
+ testInvocationNamespace,
657
+ testfqn.resolve(EntityName (" ns1" )),
658
+ testRevision,
659
+ actionMetadata,
660
+ testsid,
661
+ schedulerHost,
662
+ rpcPort)
663
+ val msg2 =
664
+ ContainerCreationMessage (
665
+ TransactionId .testing,
666
+ testInvocationNamespace,
667
+ testfqn.resolve(EntityName (" ns2" )),
668
+ testRevision,
669
+ actionMetadata,
670
+ testsid,
671
+ schedulerHost,
672
+ rpcPort)
673
+
674
+ val probe = TestProbe ()
675
+ QueuePool .put(
676
+ MemoryQueueKey (testInvocationNamespace, testfqn.toDocId.asDocInfo(testRevision)),
677
+ MemoryQueueValue (probe.ref, true ))
678
+
679
+ val healthyInvokers : List [InvokerHealth ] =
680
+ List (InvokerHealth (InvokerInstanceId (0 , userMemory = 256 .MB , tags = Seq (" cpu" , " memory" )), Healthy ))
681
+
682
+ // there is no available invokers which has no tags, it should choose tagged invokers for msg
683
+ // and for msg2, it should return no available invokers
684
+ val pairs =
685
+ ContainerManager .schedule(healthyInvokers, List (msg, msg2), msg.whiskActionMetaData.limits.memory.megabytes.MB )
686
+ pairs should contain theSameElementsAs List (ScheduledPair (msg, healthyInvokers(0 ).id))
687
+
688
+ probe.expectMsg(
689
+ FailedCreationJob (
690
+ msg2.creationId,
691
+ testInvocationNamespace,
692
+ msg2.action,
693
+ testRevision,
694
+ NoAvailableInvokersError ,
695
+ " No available invokers." ))
696
+ }
697
+
630
698
it should " respect the resource policy while use resource filter" in {
631
699
val msg1 =
632
700
ContainerCreationMessage (
@@ -671,6 +739,21 @@ class ContainerManagerTests
671
739
testsid,
672
740
schedulerHost,
673
741
rpcPort)
742
+ val msg4 =
743
+ ContainerCreationMessage (
744
+ TransactionId .testing,
745
+ testInvocationNamespace,
746
+ testfqn.resolve(EntityName (" ns3" )),
747
+ testRevision,
748
+ actionMetadata.copy(
749
+ limits = action.limits.copy(memory = MemoryLimit (512 .MB )),
750
+ annotations =
751
+ Parameters (Annotations .InvokerResourcesAnnotationName , JsArray (JsString (" non-exist" ))) ++ Parameters (
752
+ Annotations .InvokerResourcesStrictPolicyAnnotationName ,
753
+ JsBoolean (false ))),
754
+ testsid,
755
+ schedulerHost,
756
+ rpcPort)
674
757
675
758
val probe = TestProbe ()
676
759
QueuePool .put(
@@ -679,7 +762,7 @@ class ContainerManagerTests
679
762
val healthyInvokers : List [InvokerHealth ] =
680
763
List (
681
764
InvokerHealth (InvokerInstanceId (0 , userMemory = 256 .MB , tags = Seq .empty[String ]), Healthy ),
682
- InvokerHealth (InvokerInstanceId (1 , userMemory = 512 .MB , tags = Seq (" cpu" , " memory" )), Healthy ))
765
+ InvokerHealth (InvokerInstanceId (1 , userMemory = 256 .MB , tags = Seq (" cpu" , " memory" )), Healthy ))
683
766
684
767
// while resourcesStrictPolicy is true, and there is no suitable invokers, return an error
685
768
val pairs =
@@ -701,12 +784,20 @@ class ContainerManagerTests
701
784
pairs2 should contain theSameElementsAs List (ScheduledPair (msg2, healthyInvokers(0 ).id))
702
785
703
786
// while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
704
- // if there is none, then choose other invokers, here is the invoker1
787
+ // if there is none, then choose invokers with other tags, if there is still none, return no available invokers
705
788
val pairs3 = ContainerManager .schedule(
706
789
healthyInvokers.takeRight(1 ),
707
- List (msg3),
790
+ List (msg3, msg4 ),
708
791
msg3.whiskActionMetaData.limits.memory.megabytes.MB )
709
792
pairs3 should contain theSameElementsAs List (ScheduledPair (msg3, healthyInvokers(1 ).id))
793
+ probe.expectMsg(
794
+ FailedCreationJob (
795
+ msg4.creationId,
796
+ testInvocationNamespace,
797
+ msg4.action,
798
+ testRevision,
799
+ NoAvailableInvokersError ,
800
+ " No available invokers." ))
710
801
}
711
802
712
803
it should " send FailedCreationJob to queue manager when no invokers are available" in {
@@ -932,6 +1023,81 @@ class ContainerManagerTests
932
1023
933
1024
result.take(m) shouldBe result.takeRight(b)
934
1025
}
1026
+
1027
+ behavior of " warm up"
1028
+
1029
+ it should " warm up all invokers when start" in {
1030
+ val mockEtcd = mock[EtcdClient ]
1031
+
1032
+ val invokers : List [InvokerHealth ] = List (
1033
+ InvokerHealth (InvokerInstanceId (0 , userMemory = testMemory, tags = Seq .empty[String ]), Healthy ),
1034
+ InvokerHealth (InvokerInstanceId (1 , userMemory = testMemory, tags = Seq .empty[String ]), Healthy ),
1035
+ InvokerHealth (InvokerInstanceId (2 , userMemory = testMemory, tags = Seq .empty[String ]), Healthy ),
1036
+ )
1037
+ expectGetInvokers(mockEtcd, invokers)
1038
+
1039
+ val mockJobManager = TestProbe ()
1040
+ val mockWatcher = TestProbe ()
1041
+ val receiver = TestProbe ()
1042
+
1043
+ val manager =
1044
+ system.actorOf(ContainerManager
1045
+ .props(factory(mockJobManager), mockMessaging(Some (receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
1046
+
1047
+ (0 to 2 ).foreach(i => {
1048
+ receiver.expectMsgPF() {
1049
+ case msg : String if msg.contains(" warmUp" ) && msg.contains(s " invoker $i" ) => true
1050
+ case msg => false
1051
+ }
1052
+ })
1053
+ }
1054
+
1055
+ it should " warm up new invoker when new one is registered" in {
1056
+ val mockEtcd = mock[EtcdClient ]
1057
+ expectGetInvokers(mockEtcd, List .empty)
1058
+
1059
+ val mockJobManager = TestProbe ()
1060
+ val mockWatcher = TestProbe ()
1061
+ val receiver = TestProbe ()
1062
+
1063
+ val manager =
1064
+ system.actorOf(ContainerManager
1065
+ .props(factory(mockJobManager), mockMessaging(Some (receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
1066
+
1067
+ manager ! WatchEndpointInserted (
1068
+ InvokerKeys .prefix,
1069
+ InvokerKeys .health(InvokerInstanceId (0 , userMemory = 128 .MB )),
1070
+ " " ,
1071
+ true )
1072
+ receiver.expectMsgPF() {
1073
+ case msg : String if msg.contains(" warmUp" ) && msg.contains(s " invoker0 " ) => true
1074
+ case _ => false
1075
+ }
1076
+
1077
+ // shouldn't warmup again
1078
+ manager ! WatchEndpointInserted (
1079
+ InvokerKeys .prefix,
1080
+ InvokerKeys .health(InvokerInstanceId (0 , userMemory = 128 .MB )),
1081
+ " " ,
1082
+ true )
1083
+ receiver.expectNoMessage()
1084
+
1085
+ // should warmup again since invoker0 is a new one
1086
+ manager ! WatchEndpointRemoved (
1087
+ InvokerKeys .prefix,
1088
+ InvokerKeys .health(InvokerInstanceId (0 , userMemory = 128 .MB )),
1089
+ " " ,
1090
+ true )
1091
+ manager ! WatchEndpointInserted (
1092
+ InvokerKeys .prefix,
1093
+ InvokerKeys .health(InvokerInstanceId (0 , userMemory = 128 .MB )),
1094
+ " " ,
1095
+ true )
1096
+ receiver.expectMsgPF() {
1097
+ case msg : String if msg.contains(" warmUp" ) && msg.contains(s " invoker0 " ) => true
1098
+ case _ => false
1099
+ }
1100
+ }
935
1101
}
936
1102
937
1103
@ RunWith (classOf [JUnitRunner ])
0 commit comments