diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 79112a984ba..33011c8ef72 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -465,5 +465,9 @@ etcd_connect_string: "{% set ret = [] %}\ scheduler: protocol: "{{ scheduler_protocol | default('http') }}" + maxPeek: "{{ scheduler_max_peek | default(128) }}" + queueManager: + maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}" + maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}" dataManagementService: retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala index a710d60fba1..463e3f31c15 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala @@ -562,9 +562,17 @@ object LoggingMarkers { // Time that is needed to produce message in kafka val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds) + val SCHEDULER_WAIT_TIME = + LogMarkerToken(scheduler, "waitTime", counter)(MeasurementUnit.none) def SCHEDULER_KEEP_ALIVE(leaseId: Long) = LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none) + def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none) + def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds) + def SCHEDULER_QUEUE_UPDATE(reason: String) = + LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none) + def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) = + LogMarkerToken(scheduler, "queueActivation", counter, Some(action), Map("action" -> action))(MeasurementUnit.none) /* * General markers */ diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index 00d876e3ad2..33fe9fd946d 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -296,6 +296,7 @@ object ConfigKeys { val azBlob = "whisk.azure-blob" val schedulerMaxPeek = "whisk.scheduler.max-peek" + val schedulerQueueManager = "whisk.scheduler.queue-manager" val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention" val whiskClusterName = "whisk.cluster.name" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala index ba05c17964c..9123747d908 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala @@ -60,7 +60,7 @@ case class ActivationMessage(override val transid: TransactionId, lockedArgs: Map[String, String] = Map.empty, cause: Option[ActivationId] = None, traceContext: Option[Map[String, String]] = None) - extends Message { + extends Message { override def serialize = ActivationMessage.serdes.write(this).compactPrint @@ -116,11 +116,11 @@ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Me * The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always * Right when this message is created. */ -case class CombinedCompletionAndResultMessage private(override val transid: TransactionId, - response: Either[ActivationId, WhiskActivation], - override val isSystemError: Option[Boolean], - instance: InstanceId) - extends AcknowledegmentMessage(transid) { +case class CombinedCompletionAndResultMessage private (override val transid: TransactionId, + response: Either[ActivationId, WhiskActivation], + override val isSystemError: Option[Boolean], + instance: InstanceId) + extends AcknowledegmentMessage(transid) { override def messageType = "combined" override def result = Some(response) @@ -142,11 +142,11 @@ case class CombinedCompletionAndResultMessage private(override val transid: Tran * phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the * `CompletionMessage`. */ -case class CompletionMessage private(override val transid: TransactionId, - override val activationId: ActivationId, - override val isSystemError: Option[Boolean], - instance: InstanceId) - extends AcknowledegmentMessage(transid) { +case class CompletionMessage private (override val transid: TransactionId, + override val activationId: ActivationId, + override val isSystemError: Option[Boolean], + instance: InstanceId) + extends AcknowledegmentMessage(transid) { override def messageType = "completion" override def result = None @@ -168,8 +168,8 @@ case class CompletionMessage private(override val transid: TransactionId, * The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always * Right when this message is created. */ -case class ResultMessage private(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation]) - extends AcknowledegmentMessage(transid) { +case class ResultMessage private (override val transid: TransactionId, response: Either[ActivationId, WhiskActivation]) + extends AcknowledegmentMessage(transid) { override def messageType = "result" override def result = Some(response) @@ -253,7 +253,7 @@ object AcknowledegmentMessage extends DefaultJsonProtocol { Left(value.convertTo[ActivationId]) case _: JsObject => Right(value.convertTo[WhiskActivation]) - case _ => deserializationError("could not read ResultMessage") + case _ => deserializationError("could not read ResultMessage") } } @@ -296,7 +296,7 @@ object EventMessageBody extends DefaultJsonProtocol { implicit val format = new JsonFormat[EventMessageBody] { def write(eventMessageBody: EventMessageBody) = eventMessageBody match { - case m: Metric => m.toJson + case m: Metric => m.toJson case a: Activation => a.toJson } @@ -321,7 +321,7 @@ case class Activation(name: String, causedBy: Option[String], size: Option[Int] = None, userDefinedStatusCode: Option[Int] = None) - extends EventMessageBody { + extends EventMessageBody { val typeName = Activation.typeName override def serialize = toJson.compactPrint @@ -349,12 +349,12 @@ object Activation extends DefaultJsonProtocol { private implicit val durationFormat = new RootJsonFormat[Duration] { override def write(obj: Duration): JsValue = obj match { case o if o.isFinite => JsNumber(o.toMillis) - case _ => JsNumber.zero + case _ => JsNumber.zero } override def read(json: JsValue): Duration = json match { case JsNumber(n) if n <= 0 => Duration.Zero - case JsNumber(n) => toDuration(n.longValue) + case JsNumber(n) => toDuration(n.longValue) } } @@ -437,7 +437,7 @@ case class EventMessage(source: String, userId: UUID, eventType: String, timestamp: Long = System.currentTimeMillis()) - extends Message { + extends Message { override def serialize = EventMessage.format.write(this).compactPrint } @@ -460,7 +460,7 @@ case class InvokerResourceMessage(status: String, inProgressMemory: Long, tags: Seq[String], dedicatedNamespaces: Seq[String]) - extends Message { + extends Message { /** * Serializes message to string. Must be idempotent. @@ -502,7 +502,7 @@ object InvokerResourceMessage extends DefaultJsonProtocol { object StatusQuery case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String) - extends Message { + extends Message { override def serialize: String = StatusData.serdes.write(this).compactPrint @@ -524,7 +524,7 @@ case class ContainerCreationMessage(override val transid: TransactionId, rpcPort: Int, retryCount: Int = 0, creationId: CreationId = CreationId.generate()) - extends ContainerMessage(transid) { + extends ContainerMessage(transid) { override def toJson: JsValue = ContainerCreationMessage.serdes.write(this) @@ -556,7 +556,7 @@ case class ContainerDeletionMessage(override val transid: TransactionId, action: FullyQualifiedEntityName, revision: DocRevision, whiskActionMetaData: WhiskActionMetaData) - extends ContainerMessage(transid) { + extends ContainerMessage(transid) { override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this) override def serialize: String = toJson.compactPrint @@ -640,17 +640,17 @@ object ContainerCreationError extends Enumeration { ZeroNamespaceLimit) private def parse(name: String) = name.toUpperCase match { - case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError + case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError - case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError - case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError - case "DBFETCHERROR" => DBFetchError - case "WHISKERROR" => WhiskError - case "BLACKBOXERROR" => BlackBoxError - case "TIMEOUTERROR" => TimeoutError - case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit - case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests - case "UNKNOWNERROR" => UnknownError + case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError + case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError + case "DBFETCHERROR" => DBFetchError + case "WHISKERROR" => WhiskError + case "BLACKBOXERROR" => BlackBoxError + case "TIMEOUTERROR" => TimeoutError + case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit + case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests + case "UNKNOWNERROR" => UnknownError } implicit val serds = new RootJsonFormat[ContainerCreationError] { @@ -678,7 +678,7 @@ case class ContainerCreationAckMessage(override val transid: TransactionId, retryCount: Int = 0, error: Option[ContainerCreationError] = None, reason: Option[String] = None) - extends Message { + extends Message { /** * Serializes message to string. Must be idempotent. diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala index 258fc89f319..578ac0ba18f 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala @@ -123,15 +123,15 @@ case class ByteSize(size: Long, unit: SizeUnits.Unit) extends Ordered[ByteSize] override def equals(that: Any): Boolean = that match { case t: ByteSize => compareTo(t) == 0 - case _ => false + case _ => false } override def toString = { unit match { case SizeUnits.BYTE => s"$size B" - case SizeUnits.KB => s"$size KB" - case SizeUnits.MB => s"$size MB" - case SizeUnits.GB => s"$size GB" + case SizeUnits.KB => s"$size KB" + case SizeUnits.MB => s"$size MB" + case SizeUnits.GB => s"$size GB" } } } @@ -190,7 +190,7 @@ object size { def read(value: JsValue): ByteSize = value match { case JsString(s) => ByteSize.fromString(s) - case _ => deserializationError(formatError) + case _ => deserializationError(formatError) } } } diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf index cc5fa49f6fd..d9c15ac7d21 100644 --- a/core/scheduler/src/main/resources/application.conf +++ b/core/scheduler/src/main/resources/application.conf @@ -27,6 +27,11 @@ whisk { } scheduler { + queue-manager { + max-scheduling-time = "20 seconds" + max-retries-to-get-queue = "13" + } + max-peek = "128" in-progress-job-retention = "20 seconds" } } diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala new file mode 100644 index 00000000000..e6f3a6b7880 --- /dev/null +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.queue + +import org.apache.openwhisk.core.connector.ActivationMessage +import org.apache.openwhisk.core.entity.{DocInfo, FullyQualifiedEntityName} + +import scala.concurrent.Promise + +// Events sent by the actor +case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey: Option[String]) +case class QueueReactivated(invocationNamespace: String, action: FullyQualifiedEntityName, docInfo: DocInfo) +case class CancelPoll(promise: Promise[Either[MemoryQueueError, ActivationMessage]]) +case object QueueRemovedCompleted +case object FlushPulse + +// Events received by the actor +case object Start +case object VersionUpdated +case object StopSchedulingAsOutdated diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala index 1b6b818c8f8..9a45f8ad0a6 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala @@ -17,16 +17,580 @@ package org.apache.openwhisk.core.scheduler.queue -import akka.actor.ActorRef +import java.nio.charset.StandardCharsets +import java.time.Instant + +import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSelection, PoisonPill, Props} +import akka.pattern.ask +import akka.util.Timeout +import org.apache.openwhisk.common._ +import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.core.WarmUp.isWarmUpAction +import org.apache.openwhisk.core.ack.ActiveAck import org.apache.openwhisk.core.connector._ -import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.core.containerpool.Interval +import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext} +import org.apache.openwhisk.core.entity.{ActivationResponse => OriginActivationResponse, _} +import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys} +import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader} +import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates} +import org.apache.openwhisk.core.service._ +import pureconfig.loadConfigOrThrow import spray.json.{DefaultJsonProtocol, _} + +import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap -import scala.util.Try +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Try} +import pureconfig.generic.auto._ object QueueSize case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo) case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean) +case class UpdateMemoryQueue(oldAction: DocInfo, + newAction: FullyQualifiedEntityName, + activationMessage: ActivationMessage) +case class CreateNewQueue(activationMessage: ActivationMessage, + action: FullyQualifiedEntityName, + actionMetadata: WhiskActionMetaData) + +case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration) + +class QueueManager( + entityStore: ArtifactStore[WhiskEntity], + getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskActionMetaData], + etcdClient: EtcdClient, + schedulerEndpoints: SchedulerEndpoints, + schedulerId: SchedulerInstanceId, + dataManagementService: ActorRef, + watcherService: ActorRef, + ack: ActiveAck, + store: (TransactionId, WhiskActivation, UserContext) => Future[Any], + childFactory: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef, + schedulerConsumer: MessageConsumer, + queueManagerConfig: QueueManagerConfig = loadConfigOrThrow[QueueManagerConfig](ConfigKeys.schedulerQueueManager))( + implicit logging: Logging) + extends Actor { + + val maxPeek = loadConfigOrThrow[Int](ConfigKeys.schedulerMaxPeek) + + /** key: leader-key, value:DocRevision */ + private val initRevisionMap = TrieMap[String, DocRevision]() + + private val actorSelectionMap = TrieMap[String, ActorSelection]() + + private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]() + + private implicit val askTimeout = Timeout(5.seconds) + private implicit val ec = context.dispatcher + private implicit val system = context.system + + private val watcherName = "queue-manager" + // watch leaders and register them into actorSelectionMap + watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent)) + + override def receive: Receive = { + case request: CreateQueue if isWarmUpAction(request.fqn) => + logging.info( + this, + s"The ${request.fqn} action is an action used to connect a network level connection. So drop the message without creating a queue.") + sender ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true) + + // note: action sent from the pool balancer already includes version + case request: CreateQueue => + val receiver = sender + QueuePool.get(MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision))) match { + case Some(_) => + logging.info(this, s"Queue already exist for ${request.invocationNamespace}/${request.fqn}") + receiver ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true) + + case None => + logging.info(this, s"Trying to create queue for ${request.invocationNamespace}/${request.fqn}") + electLeaderAndCreateQueue(request, Some(receiver)) + } + + case msg: ElectionResult => + msg.leadership match { + case Right(EtcdLeader(key, value, lease)) => + leaderElectionCallbacks.remove(key).foreach { callback => + callback(Right(EtcdLeader(key, value, lease))) + } + + case Left(EtcdFollower(key, value)) => + leaderElectionCallbacks.remove(key).foreach { callback => + callback(Left(EtcdFollower(key, value))) + } + } + + case msg: ActivationMessage => + logging.info( + this, + s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")( + msg.transid) + + handleActivationMessage(msg) + + case UpdateMemoryQueue(oldAction, newAction, msg) => + logging.info( + this, + s"[${msg.activationId}] Update the memory queue for ${newAction.namespace}/${newAction.name}, old rev: ${oldAction.rev} new rev: ${msg.revision}, activationId: ${msg.activationId.asString}") + implicit val transid = msg.transid + QueuePool.get(MemoryQueueKey(msg.user.namespace.name.asString, oldAction)) match { + case Some(memoryQueueValue) => + QueuePool.put( + MemoryQueueKey(msg.user.namespace.name.asString, oldAction), + MemoryQueueValue(memoryQueueValue.queue, false)) + memoryQueueValue.queue ! StopSchedulingAsOutdated + + case _ => + // do nothing because we will anyway create a new one + } + createNewQueue(newAction, msg) + + case CreateNewQueue(msg, action, actionMetaData) => + val memoryQueueKey = MemoryQueueKey(msg.user.namespace.name.asString, action.toDocId.asDocInfo(msg.revision)) + QueuePool.get(memoryQueueKey) match { + case Some(queue) if queue.isLeader => + queue.queue ! msg + logging.info(this, s"Queue for action $action is already updated, skip")(msg.transid) + case _ => + val queue = + childFactory(context, msg.user.namespace.name.asString, action, msg.revision, actionMetaData) + queue ! VersionUpdated + QueuePool.put( + MemoryQueueKey(msg.user.namespace.name.asString, action.toDocId.asDocInfo(msg.revision)), + MemoryQueueValue(queue, true)) + updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision) + queue ! msg + msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE) + } + + // leaderKey is now optional, it becomes None when the stale queue is removed + case QueueRemoved(invocationNamespace, action, leaderKey) => + (QueuePool.remove(MemoryQueueKey(invocationNamespace, action)), leaderKey) match { + case (Some(_), Some(key)) => + logging.info(this, s"Remove init revision map cause queue is removed, key: ${key}") + initRevisionMap.remove(key) + case _ => // do nothing + } + sender ! QueueRemovedCompleted // notify queue that it can stop safely + + // a Removed queue backed to Running + case QueueReactivated(invocationNamespace, action, docInfo) => + QueuePool.put(MemoryQueueKey(invocationNamespace, docInfo), MemoryQueueValue(sender(), true)) + updateInitRevisionMap(getLeaderKey(invocationNamespace, action), docInfo.rev) + + // only handle prefix watcher + case WatchEndpointInserted(_, key, value, true) => + if (key.contains("leader") && value.contains("host")) { + SchedulerEndpoints + .parse(value) + .map { endpoints => + logging.info(this, s"Endpoint inserted, key: $key, endpoints: $endpoints") + actorSelectionMap.update(key, endpoints.getRemoteRef(QueueManager.actorName)) + } + .recover { + case t => + logging.error(this, s"Unexpected error $t when put leaderKey: ${key}") + } + } + + // only handle prefix watcher + case WatchEndpointRemoved(_, key, _, true) => + if (key.contains("leader")) { + if (actorSelectionMap.contains(key)) { + logging.info(this, s"Endpoint removed for key: $key") + actorSelectionMap.remove(key) + } else { + logging.info(this, s"Endpoint removed for key: $key but not in this scheduler") + } + } + + case GracefulShutdown => + logging.info(this, s"Gracefully shutdown the queue manager") + + watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName) + logScheduler.cancel() + healthReporter ! PoisonPill + dataManagementService ! UnregisterData(SchedulerKeys.scheduler(schedulerId)) + + QueuePool.values.foreach { queueInfo => + //send GracefulShutdown as the queue is not outdated + queueInfo.queue ! GracefulShutdown + } + + // this is for graceful shutdown of the feed as well. + // When the scheduler endpoint is removed, there can be some unprocessed data in Kafka + // So we would wait for some time to consume all messages in Kafka + akka.pattern.after(5.seconds, system.scheduler) { + feed ! GracefulShutdown + Future.successful({}) + } + + case QueueSize => + sender ! QueuePool.size + + case StatusQuery => + val poolStatus = Future.sequence { + QueuePool.values.map(_.queue.ask(StatusQuery).mapTo[StatusData]) + } + sender ! poolStatus + + case msg => + logging.error(this, s"failed to elect a leader for ${msg}") + + } + + private def handler(bytes: Array[Byte]): Future[Unit] = { + Future( + ActivationMessage + .parse(new String(bytes, StandardCharsets.UTF_8))) + .flatMap(Future.fromTry) + .flatMap { msg => + if (isWarmUpAction(msg.action)) { + logging.info( + this, + s"The ${msg.action} action is an action used to connect a network level connection. So drop the message without executing activation") + } else { + logging.info( + this, + s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from kafka.")( + msg.transid) + handleActivationMessage(msg) + } + feed ! MessageFeed.Processed + Future.successful({}) + } + .recover { + case t: DeserializationException => + feed ! MessageFeed.Processed + logging.warn(this, s"Failed to parse message to ActivationMessage, ${t.getMessage}") + } + } + + private val feed = system.actorOf(Props { + new MessageFeed("activation", logging, schedulerConsumer, maxPeek, 1.second, handler) + }) + + private def updateInitRevisionMap(key: String, revision: DocRevision): Unit = { + logging.info(this, s"Update init revision map, key: ${key}, rev: ${revision.rev}") + initRevisionMap.update(key, revision) + } + + private def createNewQueue(newAction: FullyQualifiedEntityName, msg: ActivationMessage)( + implicit transid: TransactionId): Future[Any] = { + val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_UPDATE("version-mismatch")) + + logging.info(this, s"Create a new queue for ${newAction}, rev: ${msg.revision}") + + getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision, msg.revision != DocRevision.empty) + .map { actionMetaData: WhiskActionMetaData => + actionMetaData.toExecutableWhiskAction match { + // Always use revision got from Database, there can be 2 cases for the actionMetaData.rev + // 1. msg.revision == actionMetaData.rev => OK + // 2. msg.revision != actionMetaData.rev => the msg.revision must be empty, else an mismatch error will be + // threw, we can use the revision got from Database + case Some(_) => + self ! CreateNewQueue( + msg.copy(revision = actionMetaData.rev, action = msg.action.copy(version = Some(actionMetaData.version))), + newAction.copy(version = Some(actionMetaData.version)), + actionMetaData) + transid.finished(this, start, s"action is updated to ${newAction.toDocId.asDocInfo(actionMetaData.rev)}") + + case None => + val message = s"non-executable action: ${newAction} with rev: ${msg.revision} reached queueManager" + transid.failed(this, start, message) + completeErrorActivation(msg, message) + } + } + .recoverWith { + case DocumentRevisionMismatchException(_) => + logging.warn(this, s"Document revision is mismatched for ${newAction}, rev: ${msg.revision}") + createNewQueue(newAction, msg.copy(revision = DocRevision.empty)) + case t => + transid.failed( + this, + start, + s"failed to fetch action $newAction with rev: ${msg.revision}, error ${t.getMessage}") + completeErrorActivation(msg, t.getMessage) + } + } + + private def handleActivationMessage(msg: ActivationMessage): Any = { + implicit val transid = msg.transid + + // Drop the message that has not been scheduled for a long time + val schedulingWaitTime = Interval(msg.transid.meta.start, Instant.now()).duration + MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME, schedulingWaitTime.toMillis) + + if (schedulingWaitTime > queueManagerConfig.maxSchedulingTime) { + logging.warn( + this, + s"[${msg.activationId}] the activation message has not been scheduled for ${queueManagerConfig.maxSchedulingTime.toSeconds} sec") + completeErrorActivation(msg, "The activation has not been processed") + } else { + QueuePool.get(MemoryQueueKey(msg.user.namespace.name.asString, msg.action.toDocId.asDocInfo(msg.revision))) match { + case Some(memoryQueueValue) if memoryQueueValue.isLeader => + memoryQueueValue.queue ! msg + case _ => + val key = QueueKeys.queue(msg.user.namespace.name.asString, msg.action.copy(version = None), true) + + initRevisionMap.get(key) match { + case Some(revision) => + if (msg.revision > revision) { + logging.warn( + this, + s"[${msg.activationId}] the action version is not matched for ${msg.action.path}/${msg.action.name}, current: ${revision}, received: ${msg.revision}") + MetricEmitter.emitCounterMetric(LoggingMarkers.SCHEDULER_QUEUE_UPDATE("version-mismatch")) + val newAction = msg.action.copy(binding = None) + + self ! UpdateMemoryQueue(msg.action.toDocId.asDocInfo(revision), newAction, msg) + } else if (msg.revision < revision) { + // if revision is mismatched, the action may have been updated, + // so try again with the latest code + logging.warn( + this, + s"[${msg.activationId}] activation message with an old revision arrived, it will be replaced with the latest revision and invoked, current: ${revision}, received: ${msg.revision}") + sendActivationByLeaderKey(key, msg.copy(revision = revision)) + } else { + // The code will not run here under normal cases. it's for insurance + logging.warn( + this, + s"[${msg.activationId}] The code will not run here under normal cases, rev: ${msg.revision}") + sendActivationByLeaderKey(key, msg) + } + case None => + logging.info( + this, + s"[${msg.activationId}] the key ${key} is not in the initRevisionMap. revision: ${msg.revision}") + sendActivationByLeaderKey(key, msg) + } + } + } + } + + private def sendActivationByLeaderKey(key: String, msg: ActivationMessage)(implicit transid: TransactionId) = { + actorSelectionMap.get(key) match { + case Some(actorSelect) => + actorSelect ! msg + case None => + sendActivationToRemoteQueue(key, msg) + } + } + + private def sendActivationToRemoteQueue(key: String, msg: ActivationMessage)( + implicit transid: TransactionId): Future[Any] = { + logging.info(this, s"[${msg.activationId}] send activation to remote queue, key: ${key} revision: ${msg.revision}") + + getQueueEndpoint(key) map { endPoint => + Future( + SchedulerEndpoints + .parse(endPoint)) + .flatMap(Future.fromTry) + .map(endPoint => { + val actorSelection = endPoint.getRemoteRef(QueueManager.actorName) + logging.info(this, s"add a new actor selection to a map with key: $key") + actorSelectionMap.update(key, actorSelection) + actorSelection ! msg + }) + .recoverWith { + case t => + logging.warn(this, s"[${msg.activationId}] failed to parse endpoints (${t.getMessage})") + completeErrorActivation(msg, "The activation has not been processed") + } + + } recoverWith { + case t => + logging.warn(this, s"[${msg.activationId}] activation has been dropped (${t.getMessage})") + completeErrorActivation(msg, "The activation has not been processed") + } + } + + private def getQueueEndpoint(key: String) = { + retryFuture(maxRetries = queueManagerConfig.maxRetriesToGetQueue) { + etcdClient.get(key).map { res => + res.getKvsList.asScala.headOption match { + case Some(kv) => kv.getValue.toStringUtf8 + case None => throw new Exception(s"Failed to get endpoint ($key)") + } + } + } + } + + private def retryFuture[T](maxRetries: Int = 13, + retries: Int = 1, + factor: Float = 2.0f, + initWait: Int = 1, + curWait: Int = 0)(fn: => Future[T]): Future[T] = { + fn recoverWith { + case e if retries <= maxRetries => + val wait = + if (curWait == 0) initWait + else Math.ceil(curWait * factor).toInt + akka.pattern.after(wait.milliseconds, system.scheduler) { + val message = s"${e.getMessage} retrying after ${wait}ms ($retries/$maxRetries)" + if (retries == maxRetries) { + // if number of retries reaches maxRetries, print warning level log + logging.warn(this, message) + } else { + logging.info(this, message) + } + retryFuture(maxRetries, retries + 1, factor, initWait, wait)(fn) + } + } + } + + private def electLeaderAndCreateQueue(request: CreateQueue, receiver: Option[ActorRef] = None) = { + request.whiskActionMetaData.toExecutableWhiskAction match { + case Some(_) => + val leaderKey = getLeaderKey(request) + + // callback will be executed after leader election + leaderElectionCallbacks.get(leaderKey) match { + case None => + dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self) + leaderElectionCallbacks.put( + leaderKey, { + case Right(EtcdLeader(_, _, _)) => + val queue = childFactory( + context, + request.invocationNamespace, + request.fqn, + request.revision, + request.whiskActionMetaData) + queue ! Start + QueuePool.put( + MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)), + MemoryQueueValue(queue, true)) + updateInitRevisionMap(leaderKey, request.revision) + receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) + + // in case of follower, do nothing + case Left(EtcdFollower(_, _)) => + receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) + }) + + // there is already a leader election for leaderKey, so skip it + case Some(_) => + receiver foreach (_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) + } + + case None => + logging.error(this, s"non-executable action: ${request.fqn} with rev: ${request.revision} reached QueueManager") + receiver match { + case Some(recipient) => + recipient ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = false) + case None => + // do nothing + } + + } + } + + private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => { + MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader()) + }) + + private val healthReporter = Scheduler.scheduleWaitAtLeast(1.seconds, 1.seconds) { () => + val leaderCount = QueuePool.countLeader() + dataManagementService ! UpdateDataOnChange( + SchedulerKeys.scheduler(schedulerId), + SchedulerStates(schedulerId, leaderCount, schedulerEndpoints).serialize) + Future.successful({}) + } + + private def completeErrorActivation(activation: ActivationMessage, message: String): Future[Any] = { + val activationResponse = + generateFallbackActivation(activation, OriginActivationResponse.whiskError(message)) + + val ackMsg = if (activation.blocking) { + CombinedCompletionAndResultMessage(activation.transid, activationResponse, schedulerId) + } else { + CompletionMessage(activation.transid, activationResponse, schedulerId) + } + + ack( + activation.transid, + activationResponse, + activation.blocking, + activation.rootControllerIndex, + activation.user.namespace.uuid, + ackMsg) + .andThen { + case Failure(t) => + logging.error(this, s"failed to send ack due to ${t}") + } + store(activation.transid, activationResponse, UserContext(activation.user)) + } + + /** Generates an activation with zero runtime. Usually used for error cases */ + private def generateFallbackActivation(msg: ActivationMessage, + response: OriginActivationResponse): WhiskActivation = { + val now = Instant.now + val causedBy = if (msg.causedBySequence) { + Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))) + } else None + + val binding = + msg.action.binding.map(f => Parameters(WhiskActivation.bindingAnnotation, JsString(f.asString))) + + WhiskActivation( + activationId = msg.activationId, + namespace = msg.user.namespace.name.toPath, + subject = msg.user.subject, + cause = msg.cause, + name = msg.action.name, + version = msg.action.version.getOrElse(SemVer()), + start = now, + end = now, + duration = Some(0), + response = response, + annotations = { + Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.copy(version = None).asString)) ++ + Parameters(WhiskActivation.kindAnnotation, JsString(Exec.UNKNOWN)) ++ causedBy ++ binding + }) + } + + private def getLeaderKey(request: CreateQueue) = { + QueueKeys.queue(request.invocationNamespace, request.fqn.copy(version = None), leader = true) + } + + private def getLeaderKey(invocationNamespace: String, fqn: FullyQualifiedEntityName) = { + QueueKeys.queue(invocationNamespace, fqn.copy(version = None), leader = true) + } +} + +object QueueManager { + val actorName = "QueueManager" + + def props( + entityStore: ArtifactStore[WhiskEntity], + getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskActionMetaData], + etcdClient: EtcdClient, + schedulerEndpoints: SchedulerEndpoints, + schedulerId: SchedulerInstanceId, + dataManagementService: ActorRef, + watcherService: ActorRef, + ack: ActiveAck, + store: (TransactionId, WhiskActivation, UserContext) => Future[Any], + childFactory: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef, + schedulerConsumer: MessageConsumer)(implicit logging: Logging): Props = { + Props( + new QueueManager( + entityStore, + getWhiskActionMetaData, + etcdClient, + schedulerEndpoints, + schedulerId, + dataManagementService, + watcherService, + ack, + store, + childFactory, + schedulerConsumer)) + } +} sealed trait MemoryQueueError extends Product { val causedBy: String diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2 index 067a7363f96..6b69172bc32 100644 --- a/tests/src/test/resources/application.conf.j2 +++ b/tests/src/test/resources/application.conf.j2 @@ -140,6 +140,11 @@ whisk { scheduler { protocol = "{{ scheduler.protocol }}" + queue-manager { + max-scheduling-time = "{{ scheduler.queueManager.maxSchedulingTime }}" + max-retries-to-get-queue = "{{ scheduler.queueManager.maxRetriesToGetQueue }}" + } + max-peek = "{{ scheduler.maxPeek }}" } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala new file mode 100644 index 00000000000..399ae82f744 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala @@ -0,0 +1,943 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.queue.test + +import java.time.{Clock, Instant} +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.{Actor, ActorIdentity, ActorRef, ActorRefFactory, ActorSystem, Identify, Props} +import akka.pattern.ask +import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, TestProbe} +import akka.util.Timeout +import com.ibm.etcd.api.RangeResponse +import common.{LoggedFunction, StreamLogging} +import org.apache.openwhisk.common.{GracefulShutdown, TransactionId} +import org.apache.openwhisk.core.WarmUp.warmUpAction +import org.apache.openwhisk.core.ack.ActiveAck +import org.apache.openwhisk.core.connector.test.TestConnector +import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage} +import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext} +import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} +import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys +import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdLeader} +import org.apache.openwhisk.core.scheduler.grpc.test.CommonVariable +import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse, GetActivation} +import org.apache.openwhisk.core.scheduler.queue._ +import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates} +import org.apache.openwhisk.core.service._ +import org.junit.runner.RunWith +import org.scalamock.scalatest.MockFactory +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers} + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps + +@RunWith(classOf[JUnitRunner]) +class QueueManagerTests + extends TestKit(ActorSystem("QueueManager")) + with CommonVariable + with ImplicitSender + with FlatSpecLike + with ScalaFutures + with Matchers + with MockFactory + with BeforeAndAfterAll + with BeforeAndAfterEach + with StreamLogging { + + override def afterAll: Unit = { + QueuePool.clear() + TestKit.shutdownActorSystem(system) + } + override def beforeEach = QueuePool.clear() + implicit val askTimeout = Timeout(5 seconds) + implicit val ec = system.dispatcher + + val entityStore = WhiskEntityStore.datastore() + + val schedulerId = SchedulerInstanceId("0") + val testQueueCreationMessage = + CreateQueue(testInvocationNamespace, testFQN, testDocRevision, testActionMetaData) + + val schedulerEndpoint = SchedulerEndpoints("127.0.0.1", 2552, 8080) + val mockConsumer = new TestConnector(s"scheduler${schedulerId.asString}", 4, true) + + val messageTransId = TransactionId(TransactionId.testing.meta.id) + val uuid = UUID() + + val action = ExecutableWhiskAction(testEntityPath, testEntityName, testExec) + val testLeaderKey = QueueKeys.queue(testInvocationNamespace, action.fullyQualifiedName(false), true) + + val activationMessage = ActivationMessage( + messageTransId, + action.fullyQualifiedName(true), + testDocRevision, + Identity( + Subject(), + Namespace(EntityName(testInvocationNamespace), uuid), + BasicAuthenticationAuthKey(uuid, Secret()), + Set.empty), + ActivationId.generate(), + ControllerInstanceId("0"), + blocking = false, + content = None) + + val activationResponse = ActivationResponse(Right(activationMessage)) + + val ack = new ActiveAck { + override def apply(tid: TransactionId, + activationResult: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: ControllerInstanceId, + userId: UUID, + acknowledegment: AcknowledegmentMessage): Future[Any] = { + Future.successful({}) + + } + } + + val store: (TransactionId, WhiskActivation, UserContext) => Future[Any] = + (tid: TransactionId, activationResult: WhiskActivation, contest: UserContext) => Future.successful(()) + + val testLeaseId = 60 + + val childFactory = + (system: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => + system.actorOf(Props(new Actor() { + override def receive: Receive = { + case GetActivation(_, _, _, _, _, _) => + sender ! ActivationResponse(Right(activationMessage)) + } + })) + + def convertToMetaData(action: WhiskAction): WhiskActionMetaData = { + val exec = CodeExecMetaDataAsString(RuntimeManifest(action.exec.kind, ImageName("test")), entryPoint = Some("test")) + WhiskActionMetaData( + action.namespace, + action.name, + exec, + action.parameters, + action.limits, + action.version, + action.publish, + action.annotations) + .revision[WhiskActionMetaData](action.rev) + } + + /**get WhiskActionMetaData*/ + def getWhiskActionMetaData(meta: Future[WhiskActionMetaData]) = LoggedFunction { + (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) => + meta + } + + val get = getWhiskActionMetaData(Future(convertToMetaData(action.toWhiskAction.revision(testDocRevision)))) + val failedGet = getWhiskActionMetaData(Future.failed(new Exception("error"))) + + val watchEndpoint = + WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, "queue-manager", Set(PutEvent, DeleteEvent)) + + behavior of "QueueManager" + + it should "get the remote actor ref and send the message" in { + val mockEtcdClient = mock[EtcdClient] + val dataManagementService = getTestDataManagementService() + val testQueueManagerActorName = "QueueManagerActorSelectionTest" + val watcher = TestProbe() + + system.actorOf( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer), + testQueueManagerActorName) + + watcher.expectMsg(watchEndpoint) + val testQueueManagerPath = s"akka://QueueManager/user/${testQueueManagerActorName}" + + val selected = system.actorSelection(testQueueManagerPath) + + val ActorIdentity(_, Some(ref)) = (selected ? Identify(testQueueManagerPath)).mapTo[ActorIdentity].futureValue + + (ref ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + } + + it should "create a queue in response to a queue creation request" in { + val mockEtcdClient = mock[EtcdClient] + val dataManagementService = getTestDataManagementService() + val watcher = TestProbe() + + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + } + + it should "not create a queue if there is already a queue for the given fqn" in { + val mockEtcdClient = mock[EtcdClient] + val dataManagementService = getTestDataManagementService() + val watcher = TestProbe() + + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + dataManagementService.expectMsg(ElectLeader(testLeaderKey, schedulerEndpoint.serialize, queueManager)) + + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + } + + it should "only do leader election for one time if there are more than one create queue requests incoming" in { + val mockEtcdClient = mock[EtcdClient] + val dataManagementService = getTestDataManagementService() + dataManagementService.ignoreMsg { + case _: UpdateDataOnChange => true + } + val watcher = TestProbe() + + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + + val probe = TestProbe() + queueManager.tell(testQueueCreationMessage, probe.ref) + queueManager.tell(testQueueCreationMessage, probe.ref) + queueManager.tell(testQueueCreationMessage, probe.ref) + queueManager.tell(testQueueCreationMessage, probe.ref) + + // dataManagementService should only do one election + dataManagementService.expectMsg(ElectLeader(testLeaderKey, schedulerEndpoint.serialize, queueManager)) + dataManagementService.expectNoMessage() + + // all four requests should get responses + probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true)) + probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true)) + probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true)) + probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true)) + } + + private def getTestDataManagementService() = { + val dataManagementService = TestProbe() + dataManagementService.setAutoPilot((sender: ActorRef, msg: Any) => + msg match { + case ElectLeader(key, value, _, _) => + sender ! ElectionResult(Right(EtcdLeader(key, value, 10))) + TestActor.KeepRunning + + case _ => + TestActor.KeepRunning + }) + dataManagementService + } + + it should "create a new MemoryQueue when the revision matches with the one in a datastore" in { + val mockEtcdClient = mock[EtcdClient] + val dataManagementService = getTestDataManagementService() + val watcher = TestProbe() + + val probe = TestProbe() + + val childFactory = + (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref + + val newRevision = DocRevision("2-test-revision") + val newFqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 2))) + val newGet = getWhiskActionMetaData( + Future(convertToMetaData(action.copy(version = SemVer(0, 0, 2)).toWhiskAction.revision(newRevision)))) + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + newGet, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + //current queue's revision is `1-test-revision` + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + probe.expectMsg(Start) + + //the activationMessage's revision(2-test-revision) is newer than current queue's revision(1-test-revision) + val activationMessage = ActivationMessage( + messageTransId, + newFqn, + newRevision, + Identity( + Subject(), + Namespace(EntityName(testInvocationNamespace), uuid), + BasicAuthenticationAuthKey(uuid, Secret()), + Set.empty), + ActivationId.generate(), + ControllerInstanceId("0"), + blocking = false, + content = None) + + queueManager ! activationMessage + queueManager ! activationMessage.copy(activationId = ActivationId.generate()) // even send two requests, we should only create one queue + probe.expectMsg(StopSchedulingAsOutdated) + probe.expectMsg(VersionUpdated) + probe.expectMsg(activationMessage) + } + + it should "create a new MemoryQueue correctly when the action is updated again during updating the queue" in { + val mockEtcdClient = mock[EtcdClient] + val dataManagementService = getTestDataManagementService() + val watcher = TestProbe() + + val probe = TestProbe() + val queueWatcher = TestProbe() + + val childFactory = + (_: ActorRefFactory, + _: String, + fqn: FullyQualifiedEntityName, + revision: DocRevision, + metadata: WhiskActionMetaData) => { + queueWatcher.ref ! (fqn, revision) + probe.ref + } + + val newRevision = DocRevision("2-test-revision") + val newFqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 2))) + val finalFqn = newFqn.copy(version = Some(SemVer(0, 0, 3))) + val finalRevision = DocRevision("3-test-revision") + // simulate the case that action is updated again while fetch it from database + def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision: DocRevision, fromCache: Boolean) = { + if (docRevision == DocRevision.empty) { + Future(convertToMetaData(action.copy(version = SemVer(0, 0, 3)).toWhiskAction.revision(finalRevision))) + } else + Future.failed(DocumentRevisionMismatchException("mismatch")) + } + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + newGet, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + //current queue's revision is `1-test-revision` + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + queueWatcher.expectMsg((testFQN, testDocRevision)) + probe.expectMsg(Start) + + //the activationMessage's revision(2-test-revision) is newer than current queue's revision(1-test-revision) + val activationMessage = ActivationMessage( + messageTransId, + newFqn, + newRevision, + Identity( + Subject(), + Namespace(EntityName(testInvocationNamespace), uuid), + BasicAuthenticationAuthKey(uuid, Secret()), + Set.empty), + ActivationId.generate(), + ControllerInstanceId("0"), + blocking = false, + content = None) + + queueManager ! activationMessage + probe.expectMsg(StopSchedulingAsOutdated) + queueWatcher.expectMsg((finalFqn, finalRevision)) + probe.expectMsg(VersionUpdated) + probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision)) + } + + it should "not skip outdated activation when the revision is older than the one in a datastore" in { + stream.reset() + val mockEtcdClient = mock[EtcdClient] + val dataManagementService = getTestDataManagementService() + val watcher = TestProbe() + + val probe = TestProbe() + + val childFactory = + (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref + + val newRevision = DocRevision("2-test-revision") + val get = getWhiskActionMetaData(Future(convertToMetaData(action.toWhiskAction.revision(newRevision)))) + + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + //current queue's revision is `2-test-revision` + val testQueueCreationMessage = + CreateQueue(testInvocationNamespace, testFQN, revision = newRevision, testActionMetaData) + + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + //the activationMessage's revision(1-test-revision) is older than current queue's revision(2-test-revision) + queueManager ! activationMessage + + stream.toString should include(s"it will be replaced with the latest revision and invoked") + } + + it should "retry to fetch queue data if etcd does not respond" in { + val mockEtcdClient = stub[EtcdClient] + val dataManagementService = getTestDataManagementService() + dataManagementService.ignoreMsg { + case _: UpdateDataOnChange => true + } + val watcher = TestProbe() + + (mockEtcdClient.get _) when (*) returns (Future.failed(new Exception("failed to get for some reason"))) + + val queueManager = + TestActorRef( + new QueueManager( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer, + QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds))) + + queueManager ! activationMessage + Thread.sleep(100) + (mockEtcdClient.get _) verify (*) repeated (3) + } + + it should "retry to fetch queue data if there is no data in the response" in { + val mockEtcdClient = stub[EtcdClient] + val dataManagementService = getTestDataManagementService() + dataManagementService.ignoreMsg { + case _: UpdateDataOnChange => true + } + val watcher = TestProbe() + + val emptyResult = Future.successful(RangeResponse.newBuilder().build()) + (mockEtcdClient.get _) when (*) returns (emptyResult) + + val queueManager = + TestActorRef( + new QueueManager( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer, + QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds))) + + queueManager ! activationMessage + Thread.sleep(100) + (mockEtcdClient.get _) verify (*) repeated (3) + } + + it should "drop the activation message that has not been scheduled for a long time" in { + val mockEtcdClient = mock[EtcdClient] + val watcher = TestProbe() + val probe = TestProbe() + val dataManagementService = getTestDataManagementService() + + val ack = new ActiveAck { + override def apply(tid: TransactionId, + activationResult: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: ControllerInstanceId, + userId: UUID, + acknowledegment: AcknowledegmentMessage): Future[Any] = { + Future.successful(probe.ref ! acknowledegment.isSystemError) + } + } + + val oldNow = Instant.now(Clock.systemUTC()).minusMillis(11000) + val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow))) + + val queueManager = + TestActorRef( + new QueueManager( + entityStore, + failedGet, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer, + QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds))) + + // send old activation message + queueManager ! oldActivationMessage + + // response should be whisk internal error + probe.expectMsg(Some(true)) + + stream.toString should include(s"[${activationMessage.activationId}] the activation message has not been scheduled") + } + + it should "not drop the unscheduled activation message that has been processed within the scheduling time limit." in { + val mockEtcdClient = mock[EtcdClient] + val watcher = TestProbe() + val probe = TestProbe() + val dataManagementService = getTestDataManagementService() + + val ack = new ActiveAck { + override def apply(tid: TransactionId, + activationResult: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: ControllerInstanceId, + userId: UUID, + acknowledegment: AcknowledegmentMessage): Future[Any] = { + Future.successful(probe.ref ! activationResult.activationId) + } + } + + val oldNow = Instant.now(Clock.systemUTC()).minusMillis(9000) + val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow))) + + val queueManager = + TestActorRef( + new QueueManager( + entityStore, + failedGet, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer, + QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds))) + + // send old activation message + queueManager ! oldActivationMessage + + // ack is no expected + probe.expectNoMessage(500.milliseconds) + } + + it should "complete the error activation when the version of action is changed but fetch is failed" in { + val mockEtcdClient = mock[EtcdClient] + val watcher = TestProbe() + + val probe = TestProbe() + val consumer = TestProbe() + val dataManagementService = getTestDataManagementService() + val ack = new ActiveAck { + override def apply(tid: TransactionId, + activationResult: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: ControllerInstanceId, + userId: UUID, + acknowledegment: AcknowledegmentMessage): Future[Any] = { + Future.successful(probe.ref ! activationResult.activationId) + } + } + val store: (TransactionId, WhiskActivation, UserContext) => Future[Any] = + (_: TransactionId, activation: WhiskActivation, _: UserContext) => + Future.successful(probe.ref ! activation.activationId) + + val newFqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 2))) + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + failedGet, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + queueManager.tell( + UpdateMemoryQueue(testFQN.toDocId.asDocInfo(testDocRevision), newFqn, activationMessage), + consumer.ref) + + probe.expectMsg(activationMessage.activationId) + probe.expectMsg(activationMessage.activationId) + } + + it should "remove the queue and consumer if it receives a QueueRemoved message" in { + val mockEtcdClient = mock[EtcdClient] + val watcher = TestProbe() + val dataManagementService = getTestDataManagementService() + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + queueManager ! QueueRemoved( + testInvocationNamespace, + testFQN.toDocId.asDocInfo(testDocRevision), + Some(testLeaderKey)) + + QueuePool.size shouldBe 0 + } + + it should "put the queue back to pool if it receives a QueueReactive message" in { + val mockEtcdClient = mock[EtcdClient] + val watcher = TestProbe() + val dataManagementService = getTestDataManagementService() + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + QueuePool.size shouldBe 1 + + queueManager ! QueueRemoved( + testInvocationNamespace, + testFQN.toDocId.asDocInfo(testDocRevision), + Some(testLeaderKey)) + + QueuePool.size shouldBe 0 + + queueManager ! QueueReactivated(testInvocationNamespace, testFQN, testFQN.toDocId.asDocInfo(testDocRevision)) + + QueuePool.size shouldBe 1 + } + + it should "put pool information to data management service" in { + val mockEtcdClient = mock[EtcdClient] + + val watcher = TestProbe() + val dataManagementService = TestProbe() + val counter1 = new AtomicInteger(0) + val counter2 = new AtomicInteger(0) + val counter3 = new AtomicInteger(0) + + dataManagementService.setAutoPilot((sender: ActorRef, msg: Any) => + msg match { + case ElectLeader(key, value, _, _) => + sender ! ElectionResult(Right(EtcdLeader(key, value, 10))) + TestActor.KeepRunning + + case UpdateDataOnChange(_, value) if value == SchedulerStates(schedulerId, 1, schedulerEndpoint).serialize => + counter1.getAndIncrement() + TestActor.KeepRunning + + case UpdateDataOnChange(_, value) if value == SchedulerStates(schedulerId, 2, schedulerEndpoint).serialize => + counter2.getAndIncrement() + TestActor.KeepRunning + + case UpdateDataOnChange(_, value) if value == SchedulerStates(schedulerId, 3, schedulerEndpoint).serialize => + counter3.getAndIncrement() + TestActor.KeepRunning + + case _ => + TestActor.KeepRunning + }) + + val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1")) + val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2")) + + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + Thread.sleep(2000) + + (queueManager ? testQueueCreationMessage.copy(fqn = fqn2)) + .mapTo[CreateQueueResponse] + .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn2, success = true) + + Thread.sleep(2000) + + (queueManager ? testQueueCreationMessage.copy(fqn = fqn3)) + .mapTo[CreateQueueResponse] + .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn3, success = true) + + Thread.sleep(2000) + + counter1.get() should be > 0 + counter2.get() should be > 0 + counter3.get() should be > 0 + } + + it should "not create a queue if it is a warm-up action" in { + val mockEtcdClient = mock[EtcdClient] + val dataManagementService = getTestDataManagementService() + val watcher = TestProbe() + + val warmUpActionMetaData = + WhiskActionMetaData(warmUpAction.namespace.toPath, warmUpAction.name, testExecMetadata, version = semVer) + + val warmUpQueueCreationMessage = + CreateQueue(warmUpAction.namespace.toString, warmUpAction, testDocRevision, warmUpActionMetaData) + + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + + (queueManager ? warmUpQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + warmUpAction.namespace.toString, + warmUpAction, + true) + } + + behavior of "zero downtime deployment" + + it should "stop all memory queues and corresponding consumers when it receives graceful shutdown message" in { + val mockEtcdClient = mock[EtcdClient] + val watcher = TestProbe() + val dataManagementService = getTestDataManagementService() + val probe = TestProbe() + val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1")) + val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2")) + + // probe will watch all actors which are created by these factories + val childFactory = + (system: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => { + system.actorOf(Props(new Actor() { + override def receive: Receive = { + case GetActivation(_, _, _, _, _, _) => + sender ! ActivationResponse(Right(activationMessage)) + + case GracefulShutdown => + probe.ref ! GracefulShutdown + } + })) + } + + val queueManager = + TestActorRef( + QueueManager.props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + (queueManager ? testQueueCreationMessage.copy(fqn = fqn2)) + .mapTo[CreateQueueResponse] + .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn2, success = true) + + (queueManager ? testQueueCreationMessage.copy(fqn = fqn3)) + .mapTo[CreateQueueResponse] + .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn3, success = true) + + queueManager ! GracefulShutdown + + probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown) + } +}