diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index c7ec2066287..1755abea035 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -295,6 +295,7 @@ object ConfigKeys { val azBlob = "whisk.azure-blob" val schedulerMaxPeek = "whisk.scheduler.max-peek" + val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention" val whiskClusterName = "whisk.cluster.name" diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf index fc62ef02fb9..cc5fa49f6fd 100644 --- a/core/scheduler/src/main/resources/application.conf +++ b/core/scheduler/src/main/resources/application.conf @@ -15,7 +15,7 @@ # limitations under the License. # -whisk{ +whisk { # tracing configuration tracing { component = "Scheduler" @@ -25,4 +25,8 @@ whisk{ managed-fraction: 90% blackbox-fraction: 10% } + + scheduler { + in-progress-job-retention = "20 seconds" + } } diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala index e8ce26f2adb..a940f42f8fe 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala @@ -18,7 +18,6 @@ package org.apache.openwhisk.core.scheduler.container import java.nio.charset.StandardCharsets import java.util.concurrent.ThreadLocalRandom - import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props} import akka.event.Logging.InfoLevel import org.apache.kafka.clients.producer.RecordMetadata @@ -33,7 +32,6 @@ import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.core.entity.{ Annotations, ByteSize, - DocInfo, DocRevision, FullyQualifiedEntityName, InvokerInstanceId, @@ -55,6 +53,7 @@ import org.apache.openwhisk.core.scheduler.message.{ ReschedulingCreationJob, SuccessfulCreationJob } +import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool} import org.apache.openwhisk.core.service.{ DeleteEvent, PutEvent, @@ -570,28 +569,3 @@ object ContainerManager { } case class NoCapacityException(msg: String) extends Exception(msg) - -/** - * TODO This needs to be moved to the QueueManager component that will be added later. - */ -object QueuePool { - private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]() - - private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key) - - private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value) - - private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key) - - private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader) - - private[scheduler] def clear() = _queuePool.clear() - - private[scheduler] def size = _queuePool.size - - private[scheduler] def values = _queuePool.values - - private[scheduler] def keys = _queuePool.keys -} -case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo) -case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala new file mode 100644 index 00000000000..2cf2a710581 --- /dev/null +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.container + +import java.nio.charset.StandardCharsets +import java.util.concurrent.TimeUnit +import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Props} +import org.apache.openwhisk.common.{GracefulShutdown, Logging} +import org.apache.openwhisk.core.connector._ +import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer +import org.apache.openwhisk.core.service.{RegisterData, UnregisterData} +import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.core.scheduler.message.{ + CreationJobState, + FailedCreationJob, + FinishCreationJob, + RegisterCreationJob, + ReschedulingCreationJob, + SuccessfulCreationJob +} +import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool} +import pureconfig.loadConfigOrThrow + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} + +case object GetPoolStatus + +case class JobEntry(action: FullyQualifiedEntityName, timer: Cancellable) + +class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef, + schedulerInstanceId: SchedulerInstanceId, + dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) + extends Actor { + private implicit val ec: ExecutionContext = actorSystem.dispatcher + private val baseTimeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond) + private val retryLimit = 5 + private val retryDelayTime = 100.milliseconds + + /** + * Store a JobEntry in local to get an alarm for key timeout + * It does not matter whether the information stored in Local is redundant or null. + * When a new JobEntry is created, it is overwritten if it is duplicated. + * If there is no corresponding JobEntry at the time of deletion, nothing is done. + */ + protected val creationJobPool = TrieMap[CreationId, JobEntry]() + + override def receive: Receive = { + case RegisterCreationJob( + ContainerCreationMessage(_, invocationNamespace, action, revision, actionMetaData, _, _, _, _, creationId)) => + val isBlackboxInvocation = actionMetaData.toExecutableWhiskAction.exists(a => a.exec.pull) + registerJob(invocationNamespace, action, revision, creationId, isBlackboxInvocation) + + case FinishCreationJob( + ContainerCreationAckMessage( + tid, + creationId, + invocationNamespace, + action, + revision, + actionMetaData, + _, + schedulerHost, + rpcPort, + retryCount, + error, + reason)) => + if (error.isEmpty) { + logging.info(this, s"[$creationId] create container successfully") + deleteJob( + invocationNamespace, + action, + revision, + creationId, + SuccessfulCreationJob(creationId, invocationNamespace, action, revision)) + + } else { + val cause = reason.getOrElse("unknown reason") + // if exceed the retry limit or meet errors which we don't need to reschedule, make it a failure + if (retryCount >= retryLimit || !error.exists(ContainerCreationError.whiskErrors.contains)) { + logging.error( + this, + s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation") + // Delete from pool after all retries are failed + deleteJob( + invocationNamespace, + action, + revision, + creationId, + FailedCreationJob(creationId, invocationNamespace, action, revision, error.get, cause)) + } else { + // Reschedule + logging.error( + this, + s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Started rescheduling") + // Add some time interval during retry create container, because etcd put operation needs some time if data inconsistant happens + actorSystem.scheduler.scheduleOnce(retryDelayTime) { + context.parent ! ReschedulingCreationJob( + tid, + creationId, + invocationNamespace, + action, + revision, + actionMetaData, + schedulerHost, + rpcPort, + retryCount) + } + } + } + + case GracefulShutdown => + ackFeed ! GracefulShutdown + } + + private def registerJob(invocationNamespace: String, + action: FullyQualifiedEntityName, + revision: DocRevision, + creationId: CreationId, + isBlackboxInvocation: Boolean) = { + creationJobPool getOrElseUpdate (creationId, { + val key = inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId) + dataManagementService ! RegisterData(key, "", failoverEnabled = false) + JobEntry(action, createTimer(invocationNamespace, action, revision, creationId, isBlackboxInvocation)) + }) + } + + private def deleteJob(invocationNamespace: String, + action: FullyQualifiedEntityName, + revision: DocRevision, + creationId: CreationId, + state: CreationJobState) = { + val key = inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId) + + // If there is a JobEntry, delete it. + creationJobPool + .remove(creationId) + .foreach(entry => { + sendState(state) + entry.timer.cancel() + }) + + dataManagementService ! UnregisterData(key) + Future.successful({}) + } + + private def sendState(state: CreationJobState): Unit = { + context.parent ! state // send state to ContainerManager + QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match { + case Some(memoryQueueValue) if memoryQueueValue.isLeader => + memoryQueueValue.queue ! state + case _ => + logging.error(this, s"get a $state for a nonexistent memory queue or a follower") + } + } + + protected def createTimer(invocationNamespace: String, + action: FullyQualifiedEntityName, + revision: DocRevision, + creationId: CreationId, + isBlackbox: Boolean): Cancellable = { + val timeout = if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * 3, TimeUnit.SECONDS) else baseTimeout + actorSystem.scheduler.scheduleOnce(timeout) { + logging.warn( + this, + s"Failed to create a container for $action(blackbox: $isBlackbox), error: $creationId timed out after $timeout") + creationJobPool + .remove(creationId) + .foreach( + _ => + sendState( + FailedCreationJob( + creationId, + invocationNamespace, + action, + revision, + ContainerCreationError.TimeoutError, + s"timeout waiting for the ack of $creationId after $timeout"))) + dataManagementService ! UnregisterData( + inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId)) + } + } + + private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix) + private val topic = s"${topicPrefix}creationAck${schedulerInstanceId.asString}" + private val maxActiveAcksPerPoll = 128 + private val ackFeed = feedFactory(actorSystem, topic, maxActiveAcksPerPoll, processAcknowledgement) + + def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = { + Future(ContainerCreationAckMessage.parse(new String(bytes, StandardCharsets.UTF_8))) + .flatMap(Future.fromTry) + .flatMap { msg => + // forward msg to job manager + self ! FinishCreationJob(msg) + ackFeed ! MessageFeed.Processed + Future.successful(()) + } + .recoverWith { + case t => + // Iff everything above failed, we have a terminal error at hand. Either the message failed + // to deserialize, or something threw an error where it is not expected to throw. + ackFeed ! MessageFeed.Processed + logging.error(this, s"terminal failure while processing container creation ack message: $t") + Future.successful(()) + } + } +} + +object CreationJobManager { + def props(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef, + schedulerInstanceId: SchedulerInstanceId, + dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) = + Props(new CreationJobManager(feedFactory, schedulerInstanceId, dataManagementService)) +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala index aa688320138..a8b747ed3d9 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala @@ -50,6 +50,7 @@ import org.apache.openwhisk.core.scheduler.message.{ ReschedulingCreationJob, SuccessfulCreationJob } +import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool} import org.apache.openwhisk.core.service.WatchEndpointInserted import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.junit.runner.RunWith @@ -59,8 +60,8 @@ import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers} import pureconfig.loadConfigOrThrow import spray.json.{JsArray, JsBoolean, JsString} - import pureconfig.generic.auto._ + import scala.collection.mutable import scala.concurrent.Future import scala.concurrent.duration.{FiniteDuration, _} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala new file mode 100644 index 00000000000..243a58ec981 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.container.test + +import java.util.concurrent.TimeUnit +import akka.actor.{ActorRef, ActorRefFactory, ActorSystem} +import akka.testkit.{ImplicitSender, TestKit, TestProbe} +import com.ibm.etcd.client.{EtcdClient => Client} +import common.StreamLogging +import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.connector._ +import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} +import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer +import org.apache.openwhisk.core.scheduler.container._ +import org.apache.openwhisk.core.scheduler.message._ +import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool} +import org.apache.openwhisk.core.service.{RegisterData, UnregisterData} +import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} +import org.junit.runner.RunWith +import org.scalamock.scalatest.MockFactory +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers} +import pureconfig.loadConfigOrThrow + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContextExecutor, Future} + +@RunWith(classOf[JUnitRunner]) +class CreationJobManagerTests + extends TestKit(ActorSystem("CreationJobManager")) + with ImplicitSender + with FlatSpecLike + with ScalaFutures + with Matchers + with MockFactory + with BeforeAndAfterAll + with BeforeAndAfterEach + with StreamLogging { + + private val timeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond) + val blackboxTimeout = FiniteDuration(timeout.toSeconds * 3, TimeUnit.SECONDS) + implicit val ece: ExecutionContextExecutor = system.dispatcher + val config = new WhiskConfig(ExecManifest.requiredProperties) + val creationIdTest = CreationId.generate() + val isBlackboxInvocation = false + + val testInvocationNamespace = "test-invocation-namespace" + val testNamespace = "test-namespace" + val testAction = "test-action" + val schedulerHost = "127.17.0.1" + val rpcPort = 13001 + val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None) + val execAction = ExecutableWhiskAction(EntityPath(testNamespace), EntityName(testAction), exec) + val execMetadata = + CodeExecMetaDataAsString(RuntimeManifest(execAction.exec.kind, ImageName("test")), entryPoint = Some("test")) + val revision = DocRevision("1-testRev") + val actionMetadata = + WhiskActionMetaData( + execAction.namespace, + execAction.name, + execMetadata, + execAction.parameters, + execAction.limits, + execAction.version, + execAction.publish, + execAction.annotations) + + override def afterAll(): Unit = { + client.close() + QueuePool.clear() + TestKit.shutdownActorSystem(system) + super.afterAll() + } + + override def beforeEach(): Unit = { + QueuePool.clear() + } + + def feedFactory(actorRefFactory: ActorRefFactory, + topic: String, + maxActiveAcksPerPoll: Int, + handler: Array[Byte] => Future[Unit]): ActorRef = { + TestProbe().ref + } + + def createRegisterMessage(action: FullyQualifiedEntityName, + revision: DocRevision, + sid: SchedulerInstanceId): RegisterCreationJob = { + val message = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + action, + revision, + actionMetadata, + sid, + schedulerHost, + rpcPort, + creationId = creationIdTest) + RegisterCreationJob(message) + } + + val action = FullyQualifiedEntityName(EntityPath("test namespace"), EntityName("actionName")) + val sid = SchedulerInstanceId("0") + val iid = InvokerInstanceId(0, userMemory = 1024.MB) + val testKey = inProgressContainer(testInvocationNamespace, action, revision, sid, creationIdTest) + val memory = 256.MB + val resources = Seq.empty[String] + val resourcesStrictPolicy = true + + val registerMessage = createRegisterMessage(action, revision, sid) + + val client: Client = { + val hostAndPorts = "172.17.0.1:2379" + Client.forEndpoints(hostAndPorts).withPlainText().build() + } + + behavior of "CreationJobManager" + + it should "register creation job" in { + val probe = TestProbe() + + val manager = + system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref)) + + manager ! registerMessage + + probe.expectMsg(RegisterData(testKey, "", failoverEnabled = false)) + } + + it should "skip duplicated creation job" in { + val probe = TestProbe() + + val manager = + system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref)) + + manager ! registerMessage + manager ! registerMessage + + probe.expectMsg(RegisterData(testKey, "", failoverEnabled = false)) + probe.expectNoMessage() + } + + def createFinishMessage(action: FullyQualifiedEntityName, + revision: DocRevision, + memory: ByteSize, + invokerInstanceId: InvokerInstanceId, + retryCount: Int = 0, + error: Option[ContainerCreationError] = None): FinishCreationJob = { + val message = + ContainerCreationAckMessage( + TransactionId.testing, + creationIdTest, + testInvocationNamespace, + action, + revision, + actionMetadata, + invokerInstanceId, + schedulerHost, + rpcPort, + retryCount, + error) + FinishCreationJob(message) + } + + def createRescheduling(finishMsg: FinishCreationJob): ReschedulingCreationJob = + ReschedulingCreationJob( + finishMsg.ack.transid, + finishMsg.ack.creationId, + finishMsg.ack.invocationNamespace, + finishMsg.ack.action, + finishMsg.ack.revision, + actionMetadata, + finishMsg.ack.schedulerHost, + finishMsg.ack.rpcPort, + finishMsg.ack.retryCount) + + val normalFinish = createFinishMessage(action, revision, memory, iid, retryCount = 0) + val failedFinish = + createFinishMessage(action, revision, memory, iid, retryCount = 0, Some(ContainerCreationError.UnknownError)) + val unrescheduleFinish = + createFinishMessage(action, revision, memory, iid, retryCount = 0, Some(ContainerCreationError.BlackBoxError)) + val tooManyFinish = + createFinishMessage(action, revision, memory, iid, retryCount = 100, Some(ContainerCreationError.UnknownError)) + + it should "delete a creation job normally and send a SuccessfulCreationJob to a queue" in { + val containerManager = TestProbe() + val dataManagementService = TestProbe() + val probe = TestProbe() + val jobManager = + containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref)) + + QueuePool.put( + MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)), + MemoryQueueValue(probe.ref, true)) + jobManager ! registerMessage + + dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false)) + + jobManager ! normalFinish + + dataManagementService.expectMsg(UnregisterData(testKey)) + + containerManager.expectMsg( + SuccessfulCreationJob( + normalFinish.ack.creationId, + normalFinish.ack.invocationNamespace, + registerMessage.msg.action, + registerMessage.msg.revision)) + probe.expectMsg( + SuccessfulCreationJob( + normalFinish.ack.creationId, + normalFinish.ack.invocationNamespace, + registerMessage.msg.action, + registerMessage.msg.revision)) + } + + it should "only delete a creation job with failed msg after all retries are failed" in { + val containerManager = TestProbe() + val dataManagementService = TestProbe() + + val jobManager = + containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref)) + + jobManager ! registerMessage + + dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false)) + + jobManager ! failedFinish + + containerManager.expectMsg(createRescheduling(failedFinish)) + + jobManager ! failedFinish.copy(ack = failedFinish.ack.copy(retryCount = 5)) + dataManagementService.expectMsg(UnregisterData(testKey)) + } + + it should "delete a creation job with failed msg and send a FailedCreationJob to a queue" in { + val containerManager = TestProbe() + val dataManagementService = TestProbe() + val probe = TestProbe() + val jobManager = + containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref)) + + QueuePool.put( + MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)), + MemoryQueueValue(probe.ref, true)) + + jobManager ! registerMessage + + dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false)) + + jobManager ! unrescheduleFinish + + dataManagementService.expectMsg(UnregisterData(testKey)) + + containerManager.expectMsg( + FailedCreationJob( + registerMessage.msg.creationId, + registerMessage.msg.invocationNamespace, + registerMessage.msg.action, + registerMessage.msg.revision, + ContainerCreationError.BlackBoxError, + "unknown reason")) + probe.expectMsg( + FailedCreationJob( + registerMessage.msg.creationId, + registerMessage.msg.invocationNamespace, + registerMessage.msg.action, + registerMessage.msg.revision, + ContainerCreationError.BlackBoxError, + "unknown reason")) + } + + it should "delete a creation job that does not exist with failed msg" in { + val containerManager = TestProbe() + val dataManagementService = TestProbe() + + val jobManager = + containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref)) + + jobManager ! failedFinish.copy(ack = failedFinish.ack.copy(retryCount = 5)) + + dataManagementService.expectMsg(UnregisterData(testKey)) + } + + it should "delete a creation job with timeout" in { + val containerManager = TestProbe() + val dataManagementService = TestProbe() + + val jobManager = + containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref)) + + jobManager ! registerMessage + + dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false)) + + Thread.sleep(timeout.toMillis) // sleep 5s to wait for the timeout handler to be executed + dataManagementService.expectMsg(UnregisterData(testKey)) + containerManager.expectMsg( + FailedCreationJob( + registerMessage.msg.creationId, + registerMessage.msg.invocationNamespace, + registerMessage.msg.action, + registerMessage.msg.revision, + ContainerCreationError.TimeoutError, + s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $timeout")) + } + + it should "increase the timeout if an action is a blackbox action" in { + val containerManager = TestProbe() + val dataManagementService = TestProbe() + + val jobManager = + containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref)) + + val execMetadata = + BlackBoxExecMetaData(ImageName("test image"), Some("main"), native = false); + + val actionMetaData = WhiskActionMetaData( + execAction.namespace, + execAction.name, + execMetadata, + execAction.parameters, + execAction.limits, + execAction.version, + execAction.publish, + execAction.annotations) + + val message = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + action, + revision, + actionMetaData, + sid, + schedulerHost, + rpcPort, + creationId = creationIdTest) + val creationMsg = RegisterCreationJob(message) + + jobManager ! creationMsg + + dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false)) + + // no message for timeout + dataManagementService.expectNoMessage(timeout) + Thread.sleep(timeout.toMillis * 2) // timeout is doubled for blackbox actions + dataManagementService.expectMsg(UnregisterData(testKey)) + containerManager.expectMsg( + FailedCreationJob( + registerMessage.msg.creationId, + registerMessage.msg.invocationNamespace, + registerMessage.msg.action, + registerMessage.msg.revision, + ContainerCreationError.TimeoutError, + s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $blackboxTimeout")) + } + + it should "delete a creation job with too many retry and send a FailedCreationJob to a queue" in { + val containerManager = TestProbe() + val dataManagementService = TestProbe() + val probe = TestProbe() + val jobManager = + containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref)) + QueuePool.put( + MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)), + MemoryQueueValue(probe.ref, true)) + + jobManager ! registerMessage + + dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false)) + + jobManager ! tooManyFinish + + dataManagementService.expectMsg(UnregisterData(testKey)) + containerManager.expectMsg( + FailedCreationJob( + registerMessage.msg.creationId, + registerMessage.msg.invocationNamespace, + registerMessage.msg.action, + registerMessage.msg.revision, + ContainerCreationError.UnknownError, + "unknown reason")) + probe.expectMsg( + FailedCreationJob( + registerMessage.msg.creationId, + registerMessage.msg.invocationNamespace, + registerMessage.msg.action, + registerMessage.msg.revision, + ContainerCreationError.UnknownError, + "unknown reason")) + } +}