diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala index ff82ef5fb77..541aee0550a 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala @@ -601,7 +601,6 @@ object LoggingMarkers { LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none) def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none) def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds) - def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds) def SCHEDULER_QUEUE_UPDATE(reason: String) = LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none) def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) = diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala index a0a3ea18802..3c11916af30 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala @@ -54,9 +54,6 @@ case class UpdateMemoryQueue(oldAction: DocInfo, case class CreateNewQueue(activationMessage: ActivationMessage, action: FullyQualifiedEntityName, actionMetadata: WhiskActionMetaData) -case class RecoverQueue(activationMessage: ActivationMessage, - action: FullyQualifiedEntityName, - actionMetadata: WhiskActionMetaData) case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration) @@ -83,7 +80,7 @@ class QueueManager( private val actorSelectionMap = TrieMap[String, ActorSelection]() - private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]() + private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]() private implicit val askTimeout = Timeout(5.seconds) private implicit val ec = context.dispatcher @@ -93,8 +90,6 @@ class QueueManager( // watch leaders and register them into actorSelectionMap watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent)) - private var isShuttingDown = false - override def receive: Receive = { case request: CreateQueue if isWarmUpAction(request.fqn) => logging.info( @@ -119,12 +114,12 @@ class QueueManager( msg.leadership match { case Right(EtcdLeader(key, value, lease)) => leaderElectionCallbacks.remove(key).foreach { callback => - callback(Right(EtcdLeader(key, value, lease)), isShuttingDown) + callback(Right(EtcdLeader(key, value, lease))) } case Left(EtcdFollower(key, value)) => leaderElectionCallbacks.remove(key).foreach { callback => - callback(Left(EtcdFollower(key, value)), isShuttingDown) + callback(Left(EtcdFollower(key, value))) } } @@ -134,11 +129,7 @@ class QueueManager( s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")( msg.transid) - if (sender() == self) { - handleCycle(msg)(msg.transid) - } else { - handleActivationMessage(msg) - } + handleActivationMessage(msg) case UpdateMemoryQueue(oldAction, newAction, msg) => logging.info( @@ -173,25 +164,6 @@ class QueueManager( updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision) queue ! msg msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE) - if (isShuttingDown) { - queue ! GracefulShutdown - } - } - - case RecoverQueue(msg, action, actionMetaData) => - QueuePool.keys.find(k => - k.invocationNamespace == msg.user.namespace.name.asString && k.docInfo.id == action.toDocId) match { - // queue is already recovered or a newer queue is created, send msg to new queue - case Some(key) if key.docInfo.rev >= msg.revision => - QueuePool.get(key) match { - case Some(queue) if queue.isLeader => - queue.queue ! msg.copy(revision = key.docInfo.rev) - logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid) - case _ => - recreateQueue(action, msg, actionMetaData) - } - case _ => - recreateQueue(action, msg, actionMetaData) } // leaderKey is now optional, it becomes None when the stale queue is removed @@ -236,7 +208,6 @@ class QueueManager( } case GracefulShutdown => - isShuttingDown = true logging.info(this, s"Gracefully shutdown the queue manager") watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName) @@ -346,49 +317,6 @@ class QueueManager( } } - private def recreateQueue(action: FullyQualifiedEntityName, - msg: ActivationMessage, - actionMetaData: WhiskActionMetaData): Unit = { - logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid) - val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData) - queue ! msg - msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER) - if (isShuttingDown) { - queue ! GracefulShutdown - } - } - - private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = { - logging.warn( - this, - s"queue for ${msg.user.namespace.name.asString}/${msg.action} doesn't exist in memory but exist in etcd, recovering...") - val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER) - - logging.info(this, s"Recover a queue for ${msg.user.namespace.name.asString}/${msg.action},") - getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false) - .map { actionMetaData: WhiskActionMetaData => - actionMetaData.toExecutableWhiskAction match { - case Some(_) => - self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData) - transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}") - - case None => - val message = - s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager" - completeErrorActivation(msg, message) - transid.failed(this, start, message) - } - } - .recover { - case t => - transid.failed( - this, - start, - s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}") - completeErrorActivation(msg, t.getMessage) - } - } - private def handleActivationMessage(msg: ActivationMessage): Any = { implicit val transid = msg.transid @@ -525,24 +453,24 @@ class QueueManager( case None => dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self) leaderElectionCallbacks.put( - leaderKey, - (electResult, isShuttingDown) => { - electResult match { - case Right(EtcdLeader(_, _, _)) => - val queue = createAndStartQueue( - request.invocationNamespace, - request.fqn, - request.revision, - request.whiskActionMetaData) - receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) - if (isShuttingDown) { - queue ! GracefulShutdown - } - - // in case of follower, do nothing - case Left(EtcdFollower(_, _)) => - receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) - } + leaderKey, { + case Right(EtcdLeader(_, _, _)) => + val queue = childFactory( + context, + request.invocationNamespace, + request.fqn, + request.revision, + request.whiskActionMetaData) + queue ! Start + QueuePool.put( + MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)), + MemoryQueueValue(queue, true)) + updateInitRevisionMap(leaderKey, request.revision) + receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) + + // in case of follower, do nothing + case Left(EtcdFollower(_, _)) => + receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) }) // there is already a leader election for leaderKey, so skip it @@ -562,20 +490,6 @@ class QueueManager( } } - private def createAndStartQueue(invocationNamespace: String, - action: FullyQualifiedEntityName, - revision: DocRevision, - actionMetaData: WhiskActionMetaData): ActorRef = { - val queue = - childFactory(context, invocationNamespace, action, revision, actionMetaData) - queue ! Start - QueuePool.put( - MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)), - MemoryQueueValue(queue, true)) - updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision) - queue - } - private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => { MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader()) }) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala index f0a69a77122..6ad1513f754 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala @@ -549,65 +549,6 @@ class QueueManagerTests probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision)) } - it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in { - val mockEtcdClient = mock[EtcdClient] - (mockEtcdClient - .get(_: String)) - .expects(*) - .returning(Future.successful { - RangeResponse - .newBuilder() - .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build()) - .build() - }) - .anyNumberOfTimes() - val dataManagementService = getTestDataManagementService() - val watcher = TestProbe() - - val probe = TestProbe() - - val childFactory = - (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref - - val queueManager = - TestActorRef( - QueueManager - .props( - entityStore, - get, - mockEtcdClient, - schedulerEndpoint, - schedulerId, - dataManagementService.ref, - watcher.ref, - ack, - store, - childFactory, - mockConsumer)) - - watcher.expectMsg(watchEndpoint) - //current queue's revision is `1-test-revision` - (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( - testInvocationNamespace, - testFQN, - true) - - probe.expectMsg(Start) - - // simulate queue superseded, the queue will be removed but leader key won't be deleted - queueManager ! QueueRemoved( - testInvocationNamespace, - testFQN.toDocId.asDocInfo(testDocRevision), - Some(testLeaderKey)) - - queueManager.!(activationMessage)(queueManager) - val msg2 = activationMessage.copy(activationId = ActivationId.generate()) - queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue - probe.expectMsg(Start) - probe.expectMsg(activationMessage) - probe.expectMsg(msg2) - } - it should "not skip outdated activation when the revision is older than the one in a datastore" in { stream.reset() val mockEtcdClient = mock[EtcdClient] @@ -1141,9 +1082,6 @@ class QueueManagerTests val probe = TestProbe() val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1")) val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2")) - val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3")) - val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4")) - val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5")) // probe will watch all actors which are created by these factories val childFactory = @@ -1191,14 +1129,5 @@ class QueueManagerTests queueManager ! GracefulShutdown probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown) - - // after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too - (queueManager ? testQueueCreationMessage.copy(fqn = fqn4)) - .mapTo[CreateQueueResponse] - .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true) - queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData) - queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData) - - probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown) } }