@@ -47,7 +47,7 @@ import spray.json.{JsObject, JsString}
47
47
import java .time .Instant
48
48
import scala .collection .immutable .Queue
49
49
import scala .concurrent .Future
50
- import scala .concurrent .duration .DurationInt
50
+ import scala .concurrent .duration .{ DurationInt , FiniteDuration , MILLISECONDS }
51
51
import scala .language .postfixOps
52
52
53
53
@ RunWith (classOf [JUnitRunner ])
@@ -74,7 +74,6 @@ class MemoryQueueFlowTests
74
74
75
75
behavior of " MemoryQueueFlow"
76
76
77
- // this is 1. normal case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
78
77
it should " normally be created and handle an activation and became idle an finally removed" in {
79
78
val mockEtcdClient = mock[EtcdClient ]
80
79
val parent = TestProbe ()
@@ -168,7 +167,6 @@ class MemoryQueueFlowTests
168
167
probe.expectTerminated(fsm, 10 .seconds)
169
168
}
170
169
171
- // this is 1-2. normal case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
172
170
it should " became Idle and Running again if a message arrives" in {
173
171
val mockEtcdClient = mock[EtcdClient ]
174
172
val parent = TestProbe ()
@@ -282,7 +280,6 @@ class MemoryQueueFlowTests
282
280
probe.expectTerminated(fsm, 10 .seconds)
283
281
}
284
282
285
- // this is 2. NamespaceThrottled case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
286
283
it should " go to the Flushing state dropping messages when it can't create an initial container" in {
287
284
val mockEtcdClient = mock[EtcdClient ]
288
285
val parent = TestProbe ()
@@ -365,7 +362,6 @@ class MemoryQueueFlowTests
365
362
probe.expectTerminated(fsm, 10 .seconds)
366
363
}
367
364
368
- // this is 3. NamespaceThrottled case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
369
365
it should " go to the NamespaceThrottled state without dropping messages and get back to the Running container" in {
370
366
val mockEtcdClient = mock[EtcdClient ]
371
367
val parent = TestProbe ()
@@ -488,7 +484,6 @@ class MemoryQueueFlowTests
488
484
probe.expectTerminated(fsm, 10 .seconds)
489
485
}
490
486
491
- // this is 4. ActionThrottled case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
492
487
it should " go to the ActionThrottled state when there are too many stale activations including transition to NamespaceThrottling" in {
493
488
val mockEtcdClient = mock[EtcdClient ]
494
489
val parent = TestProbe ()
@@ -637,7 +632,6 @@ class MemoryQueueFlowTests
637
632
probe.expectTerminated(fsm, 10 .seconds)
638
633
}
639
634
640
- // this is 5. Paused case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
641
635
it should " be Flushing when the limit is 0 and restarted back to Running state when the limit is increased" in {
642
636
val mockEtcdClient = mock[EtcdClient ]
643
637
val parent = TestProbe ()
@@ -691,26 +685,18 @@ class MemoryQueueFlowTests
691
685
expectInitialData(watcher, dataMgmtService)
692
686
probe.expectMsg(Transition (fsm, Uninitialized , Running ))
693
687
694
- awaitAssert({
695
- ackedMessageCount shouldBe 1
696
- lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (namespaceLimitUnderZero)))
697
- storedMessageCount shouldBe 1
698
- lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (namespaceLimitUnderZero)))
699
- fsm.underlyingActor.queue.length shouldBe 0
700
- }, 5 .seconds)
701
-
702
688
probe.expectMsg(Transition (fsm, Running , Flushing ))
689
+ // activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error
690
+ Thread .sleep(flushGrace.toMillis)
691
+ fsm ! messages(1 )
703
692
704
693
awaitAssert({
705
- // in the paused state, all incoming messages should be dropped immediately
706
- fsm ! messages(1 )
707
- ackedMessageCount shouldBe 2
694
+ ackedMessageCount shouldBe 1
708
695
lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (namespaceLimitUnderZero)))
709
- storedMessageCount shouldBe 2
696
+ storedMessageCount shouldBe 1
710
697
lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (namespaceLimitUnderZero)))
711
- fsm.underlyingActor.queue.length shouldBe 0
712
- }, 5 .seconds)
713
-
698
+ fsm.underlyingActor.queue.length shouldBe 1
699
+ }, FiniteDuration (retentionTimeout, MILLISECONDS ))
714
700
// limit is increased by an operator
715
701
limit = 10
716
702
@@ -728,14 +714,12 @@ class MemoryQueueFlowTests
728
714
// Queue is now working
729
715
probe.expectMsg(Transition (fsm, Flushing , Running ))
730
716
731
- fsm ! messages(2 )
732
-
733
717
// one container is created
734
718
fsm.underlyingActor.namespaceContainerCount.existingContainerNumByNamespace += 1
735
719
736
720
// only one message is handled
737
721
container.send(fsm, getActivation(true , " testContainerId1" ))
738
- container.expectMsg(ActivationResponse (Right (messages(2 ))))
722
+ container.expectMsg(ActivationResponse (Right (messages(1 ))))
739
723
740
724
// deleting the container from containers set
741
725
container.send(fsm, getActivation(false , " testContainerId1" ))
@@ -758,7 +742,6 @@ class MemoryQueueFlowTests
758
742
probe.expectTerminated(fsm, 10 .seconds)
759
743
}
760
744
761
- // this is 5-2. Paused case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
762
745
it should " be Flushing when the limit is 0 and be terminated without recovering" in {
763
746
val mockEtcdClient = mock[EtcdClient ]
764
747
val parent = TestProbe ()
@@ -770,7 +753,7 @@ class MemoryQueueFlowTests
770
753
system.actorOf(SchedulingDecisionMaker .props(testInvocationNamespace, fqn, schedulingConfig))
771
754
772
755
// generate 2 activations
773
- val messages = getActivationMessages(3 )
756
+ val messages = getActivationMessages(2 )
774
757
775
758
val getUserLimit = (_ : String ) => Future .successful(0 )
776
759
@@ -804,35 +787,25 @@ class MemoryQueueFlowTests
804
787
registerCallback(probe, fsm)
805
788
806
789
fsm ! Start
790
+ fsm ! messages(0 )
807
791
expectInitialData(watcher, dataMgmtService)
808
792
fsm ! testInitialDataStorageResult
809
793
810
794
probe.expectMsg(Transition (fsm, Uninitialized , Running ))
811
795
812
- fsm ! messages(0 )
813
- awaitAssert({
814
- ackedMessageCount shouldBe 1
815
- lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (namespaceLimitUnderZero)))
816
- storedMessageCount shouldBe 1
817
- lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (namespaceLimitUnderZero)))
818
- fsm.underlyingActor.queue.length shouldBe 0
819
- }, 5 .seconds)
820
-
821
796
probe.expectMsg(Transition (fsm, Running , Flushing ))
822
-
823
- // in the paused state, all incoming messages should be dropped immediately
824
797
fsm ! messages(1 )
825
798
799
+ // activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error
800
+ Thread .sleep(flushGrace.toMillis)
801
+
826
802
awaitAssert({
827
803
ackedMessageCount shouldBe 2
828
804
lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (namespaceLimitUnderZero)))
829
805
storedMessageCount shouldBe 2
830
806
lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (namespaceLimitUnderZero)))
831
807
fsm.underlyingActor.queue.length shouldBe 0
832
- }, 5 .seconds)
833
-
834
- // normal termination process
835
- Thread .sleep(flushGrace.toMillis * 2 )
808
+ }, FiniteDuration (retentionTimeout, MILLISECONDS ))
836
809
837
810
// In this case data clean up happens first.
838
811
expectDataCleanUp(watcher, dataMgmtService)
@@ -844,7 +817,6 @@ class MemoryQueueFlowTests
844
817
probe.expectTerminated(fsm, 10 .seconds)
845
818
}
846
819
847
- // this is 6. Waiting case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
848
820
it should " be the Flushing state when a whisk error happens" in {
849
821
val mockEtcdClient = mock[EtcdClient ]
850
822
val parent = TestProbe ()
@@ -921,13 +893,15 @@ class MemoryQueueFlowTests
921
893
922
894
fsm ! messages(1 )
923
895
896
+ Thread .sleep(flushGrace.toMillis)
897
+
924
898
awaitAssert({
925
899
ackedMessageCount shouldBe 2
926
900
lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (" whisk error" )))
927
901
storedMessageCount shouldBe 2
928
902
lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (" whisk error" )))
929
903
fsm.underlyingActor.queue.length shouldBe 0
930
- }, 5 .seconds )
904
+ }, FiniteDuration (retentionTimeout, MILLISECONDS ) )
931
905
932
906
Thread .sleep(flushGrace.toMillis * 2 )
933
907
@@ -941,7 +915,6 @@ class MemoryQueueFlowTests
941
915
probe.expectTerminated(fsm, 10 .seconds)
942
916
}
943
917
944
- // this is 6-2. Waiting case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
945
918
it should " be the Flushing state when a whisk error happens and be recovered when a container is created" in {
946
919
val mockEtcdClient = mock[EtcdClient ]
947
920
val parent = TestProbe ()
@@ -952,6 +925,8 @@ class MemoryQueueFlowTests
952
925
system.actorOf(SchedulingDecisionMaker .props(testInvocationNamespace, fqn, schedulingConfig))
953
926
val probe = TestProbe ()
954
927
val container = TestProbe ()
928
+ // generate 2 activations
929
+ val messages = getActivationMessages(2 )
955
930
956
931
expectDurationChecking(mockEsClient, testInvocationNamespace)
957
932
@@ -988,36 +963,39 @@ class MemoryQueueFlowTests
988
963
989
964
probe.expectMsg(Transition (fsm, Uninitialized , Running ))
990
965
991
- fsm ! message
992
- // any id is fine because it would be overridden
993
- var creationId = CreationId .generate()
966
+ fsm ! messages(0 )
994
967
968
+ // Failed to create a container
995
969
containerManager.expectMsgPF() {
996
970
case ContainerCreation (List (ContainerCreationMessage (_, _, _, _, _, _, _, _, _, id)), _, _) =>
997
- creationId = id
971
+ fsm ! FailedCreationJob (id, testInvocationNamespace, fqn, revision, WhiskError , " whisk error " )
998
972
}
999
- // Failed to create a container
1000
- fsm ! FailedCreationJob (creationId, testInvocationNamespace, fqn, revision, WhiskError , " whisk error" )
973
+
974
+ probe.expectMsg(Transition (fsm, Running , Flushing ))
975
+ Thread .sleep(1000 )
976
+ fsm ! messages(1 )
977
+
978
+ // activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error
979
+ Thread .sleep(flushGrace.toMillis)
1001
980
1002
981
awaitAssert({
1003
982
ackedMessageCount shouldBe 1
1004
983
lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (" whisk error" )))
1005
984
storedMessageCount shouldBe 1
1006
985
lastAckedActivationResult.response.result shouldBe Some (JsObject (" error" -> JsString (" whisk error" )))
1007
- fsm.underlyingActor.queue.length shouldBe 0
1008
- }, 5 .seconds )
986
+ fsm.underlyingActor.queue.length shouldBe 1
987
+ }, FiniteDuration (retentionTimeout, MILLISECONDS ) )
1009
988
1010
- probe.expectMsg(Transition (fsm, Running , Flushing ))
1011
-
1012
- // Failed to create a container
1013
- fsm ! SuccessfulCreationJob (creationId, testInvocationNamespace, fqn, revision)
989
+ // Succeed to create a container
990
+ containerManager.expectMsgPF() {
991
+ case ContainerCreation (List (ContainerCreationMessage (_, _, _, _, _, _, _, _, _, id)), _, _) =>
992
+ fsm ! SuccessfulCreationJob (id, testInvocationNamespace, fqn, revision)
993
+ }
1014
994
1015
995
probe.expectMsg(Transition (fsm, Flushing , Running ))
1016
996
1017
- fsm ! message
1018
-
1019
997
container.send(fsm, getActivation())
1020
- container.expectMsg(ActivationResponse (Right (message )))
998
+ container.expectMsg(ActivationResponse (Right (messages( 1 ) )))
1021
999
1022
1000
// deleting the container from containers set
1023
1001
container.send(fsm, getActivation(false ))
@@ -1039,7 +1017,6 @@ class MemoryQueueFlowTests
1039
1017
probe.expectTerminated(fsm, 10 .seconds)
1040
1018
}
1041
1019
1042
- // this is 7. Flushing case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
1043
1020
it should " be the Flushing state when a developer error happens" in {
1044
1021
val mockEtcdClient = mock[EtcdClient ]
1045
1022
val parent = TestProbe ()
@@ -1143,7 +1120,6 @@ class MemoryQueueFlowTests
1143
1120
probe.expectTerminated(fsm, 10 .seconds)
1144
1121
}
1145
1122
1146
- // this is 8. GracefulShuttingDown case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
1147
1123
it should " be gracefully terminated when it receives a GracefulShutDown message" in {
1148
1124
val mockEtcdClient = mock[EtcdClient ]
1149
1125
val parent = TestProbe ()
@@ -1250,7 +1226,6 @@ class MemoryQueueFlowTests
1250
1226
probe.expectTerminated(fsm, 10 .seconds)
1251
1227
}
1252
1228
1253
- // this is 10. deprecated case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
1254
1229
it should " be deprecated when a new queue supersedes it." in {
1255
1230
// GracefulShuttingDown is not applicable
1256
1231
val allStates = List (Running , Idle , Flushing , ActionThrottled , NamespaceThrottled , Removing , Removed )
@@ -1335,7 +1310,6 @@ class MemoryQueueFlowTests
1335
1310
}
1336
1311
}
1337
1312
1338
- // this is 10-2. deprecated case in https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
1339
1313
it should " be deprecated and stops even if the queue manager could not respond." in {
1340
1314
// GracefulShuttingDown is not applicable
1341
1315
val allStates = List (Running , Idle , Flushing , ActionThrottled , NamespaceThrottled , Removing , Removed )
@@ -1564,6 +1538,20 @@ class MemoryQueueFlowTests
1564
1538
fsm ! QueueRemovedCompleted
1565
1539
probe.expectTerminated(fsm, 10 .seconds)
1566
1540
1541
+ case Flushing =>
1542
+ // queue is stale and will be removed
1543
+ parent.expectMsg(staleQueueRemovedMsg)
1544
+ probe.expectMsg(Transition (fsm, state, Removed ))
1545
+
1546
+ fsm ! QueueRemovedCompleted
1547
+
1548
+ Thread .sleep(gracefulShutdownTimeout.toMillis)
1549
+
1550
+ watcher.expectMsgAllOf(
1551
+ UnwatchEndpoint (inProgressContainerKey, isPrefix = true , watcherName),
1552
+ UnwatchEndpoint (existingContainerKey, isPrefix = true , watcherName),
1553
+ UnwatchEndpoint (leaderKey, isPrefix = false , watcherName))
1554
+
1567
1555
case _ =>
1568
1556
parent.expectMsg(staleQueueRemovedMsg)
1569
1557
parent.expectMsg(message)
0 commit comments