@@ -549,65 +549,6 @@ class QueueManagerTests
549
549
probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
550
550
}
551
551
552
- it should " recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
553
- val mockEtcdClient = mock[EtcdClient ]
554
- (mockEtcdClient
555
- .get(_ : String ))
556
- .expects(* )
557
- .returning(Future .successful {
558
- RangeResponse
559
- .newBuilder()
560
- .addKvs(KeyValue .newBuilder().setKey(" test" ).setValue(schedulerEndpoint.serialize).build())
561
- .build()
562
- })
563
- .anyNumberOfTimes()
564
- val dataManagementService = getTestDataManagementService()
565
- val watcher = TestProbe ()
566
-
567
- val probe = TestProbe ()
568
-
569
- val childFactory =
570
- (_ : ActorRefFactory , _ : String , _ : FullyQualifiedEntityName , _ : DocRevision , _ : WhiskActionMetaData ) => probe.ref
571
-
572
- val queueManager =
573
- TestActorRef (
574
- QueueManager
575
- .props(
576
- entityStore,
577
- get,
578
- mockEtcdClient,
579
- schedulerEndpoint,
580
- schedulerId,
581
- dataManagementService.ref,
582
- watcher.ref,
583
- ack,
584
- store,
585
- childFactory,
586
- mockConsumer))
587
-
588
- watcher.expectMsg(watchEndpoint)
589
- // current queue's revision is `1-test-revision`
590
- (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse ].futureValue shouldBe CreateQueueResponse (
591
- testInvocationNamespace,
592
- testFQN,
593
- true )
594
-
595
- probe.expectMsg(Start )
596
-
597
- // simulate queue superseded, the queue will be removed but leader key won't be deleted
598
- queueManager ! QueueRemoved (
599
- testInvocationNamespace,
600
- testFQN.toDocId.asDocInfo(testDocRevision),
601
- Some (testLeaderKey))
602
-
603
- queueManager.! (activationMessage)(queueManager)
604
- val msg2 = activationMessage.copy(activationId = ActivationId .generate())
605
- queueManager.! (msg2)(queueManager) // even send two requests, we should only recreate one queue
606
- probe.expectMsg(Start )
607
- probe.expectMsg(activationMessage)
608
- probe.expectMsg(msg2)
609
- }
610
-
611
552
it should " not skip outdated activation when the revision is older than the one in a datastore" in {
612
553
stream.reset()
613
554
val mockEtcdClient = mock[EtcdClient ]
@@ -1141,9 +1082,6 @@ class QueueManagerTests
1141
1082
val probe = TestProbe ()
1142
1083
val fqn2 = FullyQualifiedEntityName (EntityPath (" hello1" ), EntityName (" action1" ))
1143
1084
val fqn3 = FullyQualifiedEntityName (EntityPath (" hello2" ), EntityName (" action2" ))
1144
- val fqn4 = FullyQualifiedEntityName (EntityPath (" hello3" ), EntityName (" action3" ))
1145
- val fqn5 = FullyQualifiedEntityName (EntityPath (" hello4" ), EntityName (" action4" ))
1146
- val fqn6 = FullyQualifiedEntityName (EntityPath (" hello5" ), EntityName (" action5" ))
1147
1085
1148
1086
// probe will watch all actors which are created by these factories
1149
1087
val childFactory =
@@ -1191,14 +1129,5 @@ class QueueManagerTests
1191
1129
queueManager ! GracefulShutdown
1192
1130
1193
1131
probe.expectMsgAllOf(10 .seconds, GracefulShutdown , GracefulShutdown , GracefulShutdown )
1194
-
1195
- // after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
1196
- (queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
1197
- .mapTo[CreateQueueResponse ]
1198
- .futureValue shouldBe CreateQueueResponse (testInvocationNamespace, fqn = fqn4, success = true )
1199
- queueManager ! CreateNewQueue (activationMessage, fqn5, testActionMetaData)
1200
- queueManager ! RecoverQueue (activationMessage, fqn6, testActionMetaData)
1201
-
1202
- probe.expectMsgAllOf(10 .seconds, GracefulShutdown , GracefulShutdown , GracefulShutdown )
1203
1132
}
1204
1133
}
0 commit comments