diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala index 5ce0f44d560..d3389b818bd 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala @@ -238,6 +238,7 @@ object TransactionId { val invokerHealthManager = TransactionId(systemPrefix + "invokerHealthManager") // Invoker startup/shutdown or GC activity def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker warmup thread that makes stem-cell containers + val invokerColdstart = TransactionId(systemPrefix + "invokerColdstart") //Invoker cold start thread val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker nanny thread val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message dispatcher val loadbalancer = TransactionId(systemPrefix + "loadbalancer") // Loadbalancer thread diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala index 9da672b2abc..71bc5fa4d6b 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala @@ -18,6 +18,8 @@ package org.apache.openwhisk.core.containerpool.v2 import akka.actor.ActorRef +import org.apache.openwhisk.core.connector.ActivationMessage +import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName} import org.apache.openwhisk.core.scheduler.SchedulerEndpoints // Event send by the actor @@ -28,6 +30,14 @@ case object CloseClientProxy // Event received by the actor case object StartClient case class RequestActivation(lastDuration: Option[Long] = None, newScheduler: Option[SchedulerEndpoints] = None) +case class RescheduleActivation(invocationNamespace: String, + fqn: FullyQualifiedEntityName, + rev: DocRevision, + msg: ActivationMessage) + +case object RetryRequestActivation +case object ContainerWarmed +case object StopClientProxy // TODO, use grpc to fetch activation from memoryQueue class ActivationClientProxy {} diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala index 6f943d8542d..3482023fb9f 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala @@ -17,15 +17,55 @@ package org.apache.openwhisk.core.containerpool.v2 +import java.net.InetSocketAddress import java.time.Instant -import akka.actor.ActorRef -import org.apache.openwhisk.common.TransactionId -import org.apache.openwhisk.core.containerpool.Container -import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction} +import akka.actor.Status.{Failure => FailureMessage} +import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash} +import akka.event.Logging.InfoLevel +import akka.io.{IO, Tcp} +import akka.pattern.pipe +import akka.stream.ActorMaterializer +import org.apache.openwhisk.common.tracing.WhiskTracerProvider +import org.apache.openwhisk.common.{LoggingMarkers, TransactionId, _} +import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.core.ack.ActiveAck +import org.apache.openwhisk.core.connector.{ + ActivationMessage, + CombinedCompletionAndResultMessage, + CompletionMessage, + ResultMessage +} +import org.apache.openwhisk.core.containerpool._ +import org.apache.openwhisk.core.containerpool.logging.LogCollectingException +import org.apache.openwhisk.core.containerpool.v2.FunctionPullingContainerProxy.{ + constructWhiskActivation, + containerName +} +import org.apache.openwhisk.core.database._ +import org.apache.openwhisk.core.entity.ExecManifest.ImageName import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.core.entity.{ExecutableWhiskAction, ActivationResponse => ExecutionResponse, _} +import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys +import org.apache.openwhisk.core.invoker.Invoker.LogsCollector +import org.apache.openwhisk.core.invoker.NamespaceBlacklist +import org.apache.openwhisk.core.scheduler.SchedulerEndpoints +import org.apache.openwhisk.core.service.{RegisterData, UnregisterData} +import org.apache.openwhisk.grpc.RescheduleResponse +import org.apache.openwhisk.http.Messages +import pureconfig.loadConfigOrThrow +import spray.json.DefaultJsonProtocol.{StringJsonFormat, _} +import spray.json._ +import pureconfig.generic.auto._ + +import scala.concurrent.duration._ +import scala.concurrent.Future +import scala.util.{Failure, Success, Try} -import scala.concurrent.duration.{Deadline, FiniteDuration} +// Events used internally +case class RunActivation(action: ExecutableWhiskAction, msg: ActivationMessage) +case class RunActivationCompleted(container: Container, action: ExecutableWhiskAction, duration: Option[Long]) +case class InitCodeCompleted(data: WarmData) // Events received by the actor case class Initialize(invocationNamespace: String, @@ -62,6 +102,10 @@ case object Paused extends ProxyState case object Removing extends ProxyState case object Rescheduling extends ProxyState +// Errors +case class ContainerHealthErrorWithResumedRun(tid: TransactionId, msg: String, resumeRun: RunActivation) + extends Exception(msg) + // Data sealed abstract class Data(val memoryLimit: ByteSize) { def getContainer: Option[Container] @@ -94,6 +138,8 @@ case class InitializedData(container: Container, extends Data(action.limits.memory.megabytes.MB) with WithClient { override def getContainer = Some(container) + def toReschedulingData(resumeRun: RunActivation) = + ReschedulingData(container, invocationNamespace, action, clientProxy, resumeRun) } case class WarmData(container: Container, @@ -105,7 +151,1188 @@ case class WarmData(container: Container, extends Data(action.limits.memory.megabytes.MB) with WithClient { override def getContainer = Some(container) + def toReschedulingData(resumeRun: RunActivation) = + ReschedulingData(container, invocationNamespace, action, clientProxy, resumeRun) +} + +case class ReschedulingData(container: Container, + invocationNamespace: String, + action: ExecutableWhiskAction, + override val clientProxy: ActorRef, + resumeRun: RunActivation) + extends Data(action.limits.memory.megabytes.MB) + with WithClient { + override def getContainer = Some(container) +} + +class FunctionPullingContainerProxy( + factory: (TransactionId, + String, + ImageName, + Boolean, + ByteSize, + Int, + Option[ExecutableWhiskAction]) => Future[Container], + entityStore: ArtifactStore[WhiskEntity], + namespaceBlacklist: NamespaceBlacklist, + get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskAction], + dataManagementService: ActorRef, + clientProxyFactory: (ActorRefFactory, + String, + FullyQualifiedEntityName, + DocRevision, + String, + Int, + ContainerId) => ActorRef, + sendActiveAck: ActiveAck, + storeActivation: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any], + collectLogs: LogsCollector, + getLiveContainerCount: (String, FullyQualifiedEntityName, DocRevision) => Future[Long], + getWarmedContainerLimit: (String) => Future[(Int, FiniteDuration)], + instance: InvokerInstanceId, + invokerHealthManager: ActorRef, + poolConfig: ContainerPoolConfig, + timeoutConfig: ContainerProxyTimeoutConfig, + healtCheckConfig: ContainerProxyHealthCheckConfig, + testTcp: Option[ActorRef])(implicit actorSystem: ActorSystem, mat: ActorMaterializer, logging: Logging) + extends FSM[ProxyState, Data] + with Stash { + startWith(Uninitialized, NonexistentData()) + + implicit val ec = actorSystem.dispatcher + + private val UnusedTimeoutName = "UnusedTimeout" + private val unusedTimeout = timeoutConfig.pauseGrace + private val IdleTimeoutName = "PausingTimeout" + private val idleTimeout = timeoutConfig.idleContainer + private val KeepingTimeoutName = "KeepingTimeout" + private val RunningActivationTimeoutName = "RunningActivationTimeout" + private val runningActivationTimeout = 10.seconds + + private var timedOut = false + + var healthPingActor: Option[ActorRef] = None //setup after prewarm starts + val tcp: ActorRef = testTcp.getOrElse(IO(Tcp)) //allows to testing interaction with Tcp extension + + val runningActivations = new java.util.concurrent.ConcurrentHashMap[String, Boolean] + + when(Uninitialized) { + // pre warm a container (creates a stem cell container) + case Event(job: Start, _) => + factory( + TransactionId.invokerWarmup, + containerName(instance, "prewarm", job.exec.kind), + job.exec.image, + job.exec.pull, + job.memoryLimit, + poolConfig.cpuShare(job.memoryLimit), + None) + .map(container => PreWarmData(container, job.exec.kind, job.memoryLimit, expires = job.ttl.map(_.fromNow))) + .pipeTo(self) + goto(CreatingContainer) + + // cold start + case Event(job: Initialize, _) => + factory( // create a new container + TransactionId.invokerColdstart, + containerName(instance, job.action.namespace.namespace, job.action.name.asString), + job.action.exec.image, + job.action.exec.pull, + job.action.limits.memory.megabytes.MB, + poolConfig.cpuShare(job.action.limits.memory.megabytes.MB), + None) + .andThen { + case Failure(t) => + context.parent ! ContainerCreationFailed(t) + } + .map { container => + logging.debug(this, s"a container ${container.containerId} is created for ${job.action}") + // create a client + Try( + clientProxyFactory( + context, + job.invocationNamespace, + job.action.fullyQualifiedName(true), + job.action.rev, + job.schedulerHost, + job.rpcPort, + container.containerId)) match { + case Success(clientProxy) => + clientProxy ! StartClient + ContainerCreatedData(container, job.invocationNamespace, job.action) + case Failure(t) => + logging.error(this, s"failed to create activation client caused by: $t") + ClientCreationFailed(t, container, job.invocationNamespace, job.action) + } + } + .pipeTo(self) + + goto(CreatingClient) + + case _ => delay + } + + when(CreatingContainer) { + // container was successfully obtained + case Event(completed: PreWarmData, _: NonexistentData) => + context.parent ! ReadyToWork(completed) + goto(ContainerCreated) using completed + + // container creation failed + case Event(t: FailureMessage, _: NonexistentData) => + context.parent ! ContainerRemoved(true) + stop() + + case _ => delay + } + + // prewarmed state, container created + when(ContainerCreated) { + case Event(job: Initialize, data: PreWarmData) => + Try( + clientProxyFactory( + context, + job.invocationNamespace, + job.action.fullyQualifiedName(true), + job.action.rev, + job.schedulerHost, + job.rpcPort, + data.container.containerId)) match { + case Success(proxy) => + proxy ! StartClient + case Failure(t) => + logging.error(this, s"failed to create activation client for ${job.action} caused by: $t") + self ! ClientCreationFailed(t, data.container, job.invocationNamespace, job.action) + } + + goto(CreatingClient) using ContainerCreatedData(data.container, job.invocationNamespace, job.action) + + case Event(Remove, data: PreWarmData) => + cleanUp(data.container, None, false) + + // prewarm container failed by health check + case Event(_: FailureMessage, data: PreWarmData) => + MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_PREWARM) + cleanUp(data.container, None) + + case _ => delay + } + + when(CreatingClient) { + // wait for client creation when cold start + case Event(job: ContainerCreatedData, _: NonexistentData) => + stay() using job + + // wait for container creation when cold start + case Event(ClientCreationCompleted(proxy), _: NonexistentData) => + self ! ClientCreationCompleted(proxy.orElse(Some(sender()))) + stay() + + // client was successfully obtained + case Event(ClientCreationCompleted(proxy), data: ContainerCreatedData) => + val clientProxy = proxy.getOrElse(sender()) + val fqn = data.action.fullyQualifiedName(true) + val revision = data.action.rev + dataManagementService ! RegisterData( + s"${ContainerKeys.existingContainers(data.invocationNamespace, fqn, revision, Some(instance), Some(data.container.containerId))}", + "") + self ! InitializedData(data.container, data.invocationNamespace, data.action, clientProxy) + goto(ClientCreated) + + // client creation failed + case Event(t: ClientCreationFailed, _) => + invokerHealthManager ! HealthMessage(state = false) + cleanUp(t.container, t.invocationNamespace, t.action.fullyQualifiedName(withVersion = true), t.action.rev, None) + + // there can be a case that client create is failed and a ClientClosed will be sent by ActivationClientProxy + // wait for container creation when cold start + case Event(ClientClosed, _: NonexistentData) => + self ! ClientClosed + stay() + + case Event(ClientClosed, data: ContainerCreatedData) => + invokerHealthManager ! HealthMessage(state = false) + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + None) + + // container creation failed when cold start + case Event(t: FailureMessage, _) => + context.parent ! ContainerRemoved(true) + stop() + + case _ => delay + } + + // this is for first invocation, once the first invocation is over we are ready to trigger getActivation for action concurrency + when(ClientCreated) { + // 1. request activation message to client + case Event(initializedData: InitializedData, _) => + context.parent ! Initialized(initializedData) + initializedData.clientProxy ! RequestActivation() + setTimer(UnusedTimeoutName, StateTimeout, unusedTimeout) + stay() using initializedData + + // 2. read executable action data from db + case Event(job: ActivationMessage, data: InitializedData) => + timedOut = false + cancelTimer(UnusedTimeoutName) + handleActivationMessage(job, data.action) + .pipeTo(self) + stay() using data + + // 3. request initialize and run command to container + case Event(job: RunActivation, data: InitializedData) => + implicit val transid = job.msg.transid + logging.debug(this, s"received RunActivation ${job.msg.activationId} for ${job.action} in $stateName") + + initializeAndRunActivation(data.container, data.clientProxy, job.action, job.msg, Some(job)) + .map { activation => + RunActivationCompleted(data.container, job.action, activation.duration) + } + .pipeTo(self) + + // when it receives InitCodeCompleted, it will move to Running + stay using data + + case Event(RetryRequestActivation, data: InitializedData) => + // if this Container is marked with time out, do not retry + if (timedOut) + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + Some(data.clientProxy)) + else { + data.clientProxy ! RequestActivation() + stay() + } + + // code initialization was successful + case Event(completed: InitCodeCompleted, data: InitializedData) => + // TODO support concurrency? + data.clientProxy ! ContainerWarmed // this container is warmed + 1 until completed.data.action.limits.concurrency.maxConcurrent foreach { _ => + data.clientProxy ! RequestActivation() + } + + goto(Running) using completed.data // set warm data + + // ContainerHealthError should cause + case Event(FailureMessage(e: ContainerHealthErrorWithResumedRun), data: InitializedData) => + logging.error( + this, + s"container ${data.container.containerId.asString} health check failed on $stateName, ${e.resumeRun.msg.activationId} activation will be rescheduled") + MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_WARM) + + // reschedule message + data.clientProxy ! RescheduleActivation( + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + e.resumeRun.msg) + + goto(Rescheduling) using data.toReschedulingData(e.resumeRun) + + // Failed to get activation or execute the action + case Event(t: FailureMessage, data: InitializedData) => + logging.error( + this, + s"failed to initialize a container or run an activation for ${data.action} in state: $stateName caused by: $t") + // Stop containerProxy and ActivationClientProxy both immediately + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + Some(data.clientProxy)) + + case Event(StateTimeout, data: InitializedData) => + logging.info(this, s"No more activation is coming in state: $stateName, action: ${data.action}") + // Just mark the ContainerProxy is timedout + timedOut = true + + stay() // stay here because the ActivationClientProxy may send a new Activation message + + case Event(ClientClosed, data: InitializedData) => + logging.error(this, s"The Client closed in state: $stateName, action: ${data.action}") + // Stop ContainerProxy(ActivationClientProxy will stop also when send ClientClosed to ContainerProxy). + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + None) + + case _ => delay + } + + when(Rescheduling, stateTimeout = 10.seconds) { + + case Event(res: RescheduleResponse, data: ReschedulingData) => + implicit val transId = data.resumeRun.msg.transid + if (!res.isRescheduled) { + logging.warn(this, s"failed to reschedule the message ${data.resumeRun.msg.activationId}, clean up data") + fallbackActivationForReschedulingData(data) + } else { + logging.warn(this, s"unhandled message is rescheduled, clean up data") + } + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + Some(data.clientProxy)) + + case Event(StateTimeout, data: ReschedulingData) => + logging.error(this, s"Timeout for rescheduling message ${data.resumeRun.msg.activationId}, clean up data")( + data.resumeRun.msg.transid) + + fallbackActivationForReschedulingData(data) + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + Some(data.clientProxy)) + } + + when(Running) { + // Run was successful. + // 1. request activation message to client + case Event(activationResult: RunActivationCompleted, data: WarmData) => + // create timeout + setTimer(UnusedTimeoutName, StateTimeout, unusedTimeout) + data.clientProxy ! RequestActivation(activationResult.duration) + stay() using data + + // 2. read executable action data from db + case Event(job: ActivationMessage, data: WarmData) => + timedOut = false + cancelTimer(UnusedTimeoutName) + handleActivationMessage(job, data.action) + .pipeTo(self) + stay() using data + + // 3. request run command to container + case Event(job: RunActivation, data: WarmData) => + logging.debug(this, s"received RunActivation ${job.msg.activationId} for ${job.action} in $stateName") + implicit val transid = job.msg.transid + + initializeAndRunActivation(data.container, data.clientProxy, job.action, job.msg, Some(job)) + .map { activation => + RunActivationCompleted(data.container, job.action, activation.duration) + } + .pipeTo(self) + stay using data.copy(lastUsed = Instant.now) + + case Event(RetryRequestActivation, data: WarmData) => + // if this Container is marked with time out, do not retry + if (timedOut) { + data.container.suspend()(TransactionId.invokerNanny).map(_ => ContainerPaused).pipeTo(self) + goto(Pausing) + } else { + data.clientProxy ! RequestActivation() + stay() + } + + case Event(_: ResumeFailed, data: WarmData) => + invokerHealthManager ! HealthMessage(state = false) + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + Some(data.clientProxy)) + + // ContainerHealthError should cause + case Event(FailureMessage(e: ContainerHealthError), data: WarmData) => + logging.error(this, s"health check failed on $stateName caused by: ContainerHealthError $e") + MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_WARM) + // Stop containerProxy and ActivationClientProxy both immediately, + invokerHealthManager ! HealthMessage(state = false) + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + Some(data.clientProxy)) + + // ContainerHealthError should cause + case Event(FailureMessage(e: ContainerHealthErrorWithResumedRun), data: WarmData) => + logging.error( + this, + s"container ${data.container.containerId.asString} health check failed on $stateName, ${e.resumeRun.msg.activationId} activation will be rescheduled") + MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_WARM) + + // reschedule message + data.clientProxy ! RescheduleActivation( + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + e.resumeRun.msg) + + goto(Rescheduling) using data.toReschedulingData(e.resumeRun) + + // Failed to get activation or execute the action + case Event(t: FailureMessage, data: WarmData) => + logging.error(this, s"failed to init or run in state: $stateName caused by: $t") + // Stop containerProxy and ActivationClientProxy both immediately, + // and don't send unhealthy state message to the health manager, it's already sent. + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + Some(data.clientProxy)) + + case Event(StateTimeout, data: WarmData) => + logging.info( + this, + s"No more run activation is coming in state: $stateName, action: ${data.action}, container: ${data.container.containerId}") + // Just mark the ContainerProxy is timedout + timedOut = true + + stay() // stay here because the ActivationClientProxy may send a new Activation message + + case Event(ClientClosed, data: WarmData) => + if (runningActivations.isEmpty) { + logging.info(this, s"The Client closed in state: $stateName, action: ${data.action}") + // Stop ContainerProxy(ActivationClientProxy will stop also when send ClientClosed to ContainerProxy). + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(withVersion = true), + data.action.rev, + None) + } else { + logging.info( + this, + s"Remain running activations ${runningActivations.keySet().toString()} when received ClientClosed") + setTimer(RunningActivationTimeoutName, ClientClosed, runningActivationTimeout) + stay + } + + // shutdown the client first and wait for any remaining activation to be executed + // ContainerProxy will be terminated by StateTimeout if there is no further activation + case Event(GracefulShutdown, data: WarmData) => + logging.info(this, s"receive GracefulShutdown for action: ${data.action}") + // Just send CloseClientProxy to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time. + data.clientProxy ! CloseClientProxy + stay + + case _ => delay + } + + when(Pausing) { + case Event(ContainerPaused, data: WarmData) => + dataManagementService ! RegisterData( + ContainerKeys.warmedContainers( + data.invocationNamespace, + data.action.fullyQualifiedName(false), + data.revision, + instance, + data.container.containerId), + "") + // remove existing key so MemoryQueue can be terminated when timeout + dataManagementService ! UnregisterData( + s"${ContainerKeys.existingContainers(data.invocationNamespace, data.action.fullyQualifiedName(true), data.action.rev, Some(instance), Some(data.container.containerId))}") + context.parent ! ContainerIsPaused(data) + goto(Paused) + + case Event(_: FailureMessage, data: WarmData) => + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(false), + data.action.rev, + Some(data.clientProxy)) + + case _ => delay + } + + when(Paused) { + case Event(job: Initialize, data: WarmData) => + implicit val transId = job.transId + val parent = context.parent + cancelTimer(IdleTimeoutName) + cancelTimer(KeepingTimeoutName) + data.container + .resume() + .map { _ => + logging.info(this, s"Resumed container ${data.container.containerId}") + // put existing key again + dataManagementService ! RegisterData( + s"${ContainerKeys.existingContainers(data.invocationNamespace, data.action.fullyQualifiedName(true), data.action.rev, Some(instance), Some(data.container.containerId))}", + "") + parent ! Resumed(data) + // the new queue may locates on an different scheduler, so recreate the activation client when necessary + // since akka port will no be used, we can put any value except 0 here + data.clientProxy ! RequestActivation( + newScheduler = Some(SchedulerEndpoints(job.schedulerHost, job.rpcPort, 10))) + setTimer(UnusedTimeoutName, StateTimeout, unusedTimeout) + timedOut = false + } + .recover { + case t: Throwable => + logging.error(this, s"Failed to resume container ${data.container.containerId}, error: $t") + parent ! ResumeFailed(data) + self ! ResumeFailed(data) + } + + // always clean data in etcd regardless of success and failure + dataManagementService ! UnregisterData( + ContainerKeys.warmedContainers( + data.invocationNamespace, + data.action.fullyQualifiedName(false), + data.revision, + instance, + data.container.containerId)) + goto(Running) + + case Event(StateTimeout, data: WarmData) => + (for { + count <- getLiveContainerCount(data.invocationNamespace, data.action.fullyQualifiedName(false), data.revision) + (warmedContainerKeepingCount, warmedContainerKeepingTimeout) <- getWarmedContainerLimit( + data.invocationNamespace) + } yield { + logging.info( + this, + s"Live container count: ${count}, warmed container keeping count configuration: ${warmedContainerKeepingCount} in namespace: ${data.invocationNamespace}") + if (count <= warmedContainerKeepingCount) { + Keep(warmedContainerKeepingTimeout) + } else { + Remove + } + }).pipeTo(self) + stay + + case Event(Keep(warmedContainerKeepingTimeout), data: WarmData) => + logging.info( + this, + s"This is the remaining container for ${data.action}. The container will stop after $warmedContainerKeepingTimeout.") + setTimer(KeepingTimeoutName, Remove, warmedContainerKeepingTimeout) + stay + + case Event(Remove | GracefulShutdown, data: WarmData) => + dataManagementService ! UnregisterData( + ContainerKeys.warmedContainers( + data.invocationNamespace, + data.action.fullyQualifiedName(false), + data.revision, + instance, + data.container.containerId)) + cleanUp( + data.container, + data.invocationNamespace, + data.action.fullyQualifiedName(false), + data.action.rev, + Some(data.clientProxy)) + + case _ => delay + } + + when(Removing, unusedTimeout) { + // only if ClientProxy is closed, ContainerProxy stops. So it is important for ClientProxy to send ClientClosed. + case Event(ClientClosed, _) => + stop() + + // even if any error occurs, it still waits for ClientClosed event in order to be stopped after the client is closed. + case Event(t: FailureMessage, _) => + logging.error(this, s"unable to delete a container due to ${t}") + + stay + + case Event(StateTimeout, _) => + logging.error(this, s"could not receive ClientClosed for ${unusedTimeout}, so just stop the container proxy.") + + stop + + case Event(Remove | GracefulShutdown, _) => + stay() + } + + onTransition { + case _ -> Uninitialized => unstashAll() + case _ -> CreatingContainer => unstashAll() + case _ -> ContainerCreated => + if (healtCheckConfig.enabled) { + nextStateData.getContainer.foreach { c => + logging.info(this, s"enabling health ping for ${c.containerId.asString} on ContainerCreated") + enableHealthPing(c) + } + } + unstashAll() + case _ -> CreatingClient => unstashAll() + case _ -> ClientCreated => unstashAll() + case _ -> Running => + if (healtCheckConfig.enabled && healthPingActor.isDefined) { + nextStateData.getContainer.foreach { c => + logging.info(this, s"disabling health ping for ${c.containerId.asString} on Running") + disableHealthPing() + } + } + unstashAll() + case _ -> Paused => setTimer(IdleTimeoutName, StateTimeout, idleTimeout) + case _ -> Removing => unstashAll() + } + + initialize() + + /** Delays all incoming messages until unstashAll() is called */ + def delay = { + stash() + stay + } + + /** + * Only change the state if the currentState is not the newState. + * + * @param newState of the InvokerActor + */ + private def gotoIfNotThere(newState: ProxyState) = { + if (stateName == newState) stay() else goto(newState) + } + + /** + * Clean up all meta data of invoking action + * + * @param container the container to destroy + * @param fqn the action to stop + * @param clientProxy the client to destroy + * @return + */ + private def cleanUp(container: Container, + invocationNamespace: String, + fqn: FullyQualifiedEntityName, + revision: DocRevision, + clientProxy: Option[ActorRef]): State = { + + dataManagementService ! UnregisterData( + s"${ContainerKeys.existingContainers(invocationNamespace, fqn, revision, Some(instance), Some(container.containerId))}") + + cleanUp(container, clientProxy) + } + + private def cleanUp(container: Container, clientProxy: Option[ActorRef], replacePrewarm: Boolean = true): State = { + + context.parent ! ContainerRemoved(replacePrewarm) + val unpause = stateName match { + case Paused => container.resume()(TransactionId.invokerNanny) + case _ => Future.successful(()) + } + unpause.andThen { + case Success(_) => destroyContainer(container) + case Failure(t) => + // docker may hang when try to remove a paused container, so we shouldn't remove it + logging.error(this, s"Failed to resume container ${container.containerId}, error: $t") + } + clientProxy match { + case Some(clientProxy) => clientProxy ! StopClientProxy + case None => self ! ClientClosed + } + gotoIfNotThere(Removing) + } + + /** + * Destroys the container + * + * @param container the container to destroy + */ + private def destroyContainer(container: Container) = { + container + .destroy()(TransactionId.invokerNanny) + .andThen { + case Failure(t) => + logging.error(this, s"Failed to destroy container: ${container.containerId.asString} caused by ${t}") + } + } + + private def handleActivationMessage(msg: ActivationMessage, action: ExecutableWhiskAction): Future[RunActivation] = { + implicit val transid = msg.transid + logging.info(this, s"received a message ${msg.activationId} for ${msg.action} in $stateName") + if (!namespaceBlacklist.isBlacklisted(msg.user)) { + logging.debug(this, s"namespace ${msg.user.namespace.name} is not in the namespaceBlacklist") + val namespace = msg.action.path + val name = msg.action.name + val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision) + val subject = msg.user.subject + + logging.debug(this, s"${actionid.id} $subject ${msg.activationId}") + + // set trace context to continue tracing + WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext) + + // caching is enabled since actions have revision id and an updated + // action will not hit in the cache due to change in the revision id; + // if the doc revision is missing, then bypass cache + if (actionid.rev == DocRevision.empty) + logging.warn(this, s"revision was not provided for ${actionid.id}") + + get(entityStore, actionid.id, actionid.rev, actionid.rev != DocRevision.empty) + .flatMap { action => + action.toExecutableWhiskAction match { + case Some(executable) => + Future.successful(RunActivation(executable, msg)) + case None => + logging + .error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}") + Future.failed(new IllegalStateException("non-executable action reached the invoker")) + } + } + .recoverWith { + case DocumentRevisionMismatchException(_) => + // if revision is mismatched, the action may have been updated, + // so try again with the latest code + logging.warn( + this, + s"msg ${msg.activationId} for ${msg.action} in $stateName is updated, fetching latest code") + handleActivationMessage(msg.copy(revision = DocRevision.empty), action) + case t => + // If the action cannot be found, the user has concurrently deleted it, + // making this an application error. All other errors are considered system + // errors and should cause the invoker to be considered unhealthy. + val response = t match { + case _: NoDocumentException => + ExecutionResponse.applicationError(Messages.actionRemovedWhileInvoking) + case _: DocumentTypeMismatchException | _: DocumentUnreadable => + ExecutionResponse.whiskError(Messages.actionMismatchWhileInvoking) + case e: Throwable => + logging.error(this, s"An unknown DB connection error occurred while fetching an action: $e.") + ExecutionResponse.whiskError(Messages.actionFetchErrorWhileInvoking) + } + logging.error( + this, + s"Error to fetch action ${msg.action} for msg ${msg.activationId}, error is ${t.getMessage}") + + val context = UserContext(msg.user) + val activation = generateFallbackActivation(action, msg, response) + sendActiveAck( + transid, + activation, + msg.blocking, + msg.rootControllerIndex, + msg.user.namespace.uuid, + CombinedCompletionAndResultMessage(transid, activation, instance)) + storeActivation(msg.transid, activation, msg.blocking, context) + + // in case action is removed container proxy should be terminated + Future.failed(new IllegalStateException("action does not exist")) + } + } else { + // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol + // Due to the protective nature of the blacklist, a database entry is not written. + val activation = + generateFallbackActivation(action, msg, ExecutionResponse.applicationError(Messages.namespacesBlacklisted)) + sendActiveAck( + msg.transid, + activation, + false, + msg.rootControllerIndex, + msg.user.namespace.uuid, + CombinedCompletionAndResultMessage(msg.transid, activation, instance)) + logging.warn( + this, + s"namespace ${msg.user.namespace.name} was blocked in containerProxy, complete msg ${msg.activationId} with error.") + Future.failed(new IllegalStateException(s"namespace ${msg.user.namespace.name} was blocked in containerProxy.")) + } + + } + + private def enableHealthPing(c: Container) = { + val hpa = healthPingActor.getOrElse { + logging.info(this, s"creating health ping actor for ${c.addr.asString()}") + val hp = context.actorOf( + TCPPingClient + .props(tcp, c.toString(), healtCheckConfig, new InetSocketAddress(c.addr.host, c.addr.port))) + healthPingActor = Some(hp) + hp + } + hpa ! HealthPingEnabled(true) + } + + private def disableHealthPing() = { + healthPingActor.foreach(_ ! HealthPingEnabled(false)) + } + + def fallbackActivationForReschedulingData(data: ReschedulingData): Unit = { + val context = UserContext(data.resumeRun.msg.user) + val activation = + generateFallbackActivation(data.action, data.resumeRun.msg, ExecutionResponse.whiskError(Messages.abnormalRun)) + + sendActiveAck( + data.resumeRun.msg.transid, + activation, + data.resumeRun.msg.blocking, + data.resumeRun.msg.rootControllerIndex, + data.resumeRun.msg.user.namespace.uuid, + CombinedCompletionAndResultMessage(data.resumeRun.msg.transid, activation, instance)) + + storeActivation(data.resumeRun.msg.transid, activation, data.resumeRun.msg.blocking, context) + } + + /** + * Runs the job, initialize first if necessary. + * Completes the job by: + * 1. sending an activate ack, + * 2. fetching the logs for the run, + * 3. indicating the resource is free to the parent pool, + * 4. recording the result to the data store + * + * @param container the container to run the job on + * @param job the job to run + * @return a future completing after logs have been collected and + * added to the WhiskActivation + */ + private def initializeAndRunActivation( + container: Container, + clientProxy: ActorRef, + action: ExecutableWhiskAction, + msg: ActivationMessage, + resumeRun: Option[RunActivation] = None)(implicit tid: TransactionId): Future[WhiskActivation] = { + // Add the activation to runningActivations set + runningActivations.put(msg.activationId.asString, true) + + val actionTimeout = action.limits.timeout.duration + + val (env, parameters) = ContainerProxy.partitionArguments(msg.content, msg.initArgs) + + val environment = Map( + "namespace" -> msg.user.namespace.name.toJson, + "action_name" -> msg.action.qualifiedNameWithLeadingSlash.toJson, + "action_version" -> msg.action.version.toJson, + "activation_id" -> msg.activationId.toString.toJson, + "transaction_id" -> msg.transid.id.toJson) + + // if the action requests the api key to be injected into the action context, add it here; + // treat a missing annotation as requesting the api key for backward compatibility + val authEnvironment = { + if (action.annotations.isTruthy(Annotations.ProvideApiKeyAnnotationName, valueForNonExistent = true)) { + msg.user.authkey.toEnvironment.fields + } else Map.empty + } + + // Only initialize iff we haven't yet warmed the container + val initialize = stateData match { + case _: WarmData => + Future.successful(None) + case _ => + val owEnv = (authEnvironment ++ environment ++ Map( + "deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)) map { + case (key, value) => "__OW_" + key.toUpperCase -> value + } + container + .initialize(action.containerInitializer(env ++ owEnv), actionTimeout, action.limits.concurrency.maxConcurrent) + .map(Some(_)) + } + + val activation: Future[WhiskActivation] = initialize + .flatMap { initInterval => + // immediately setup warmedData for use (before first execution) so that concurrent actions can use it asap + if (initInterval.isDefined) { + stateData match { + case _: InitializedData => + self ! InitCodeCompleted( + WarmData(container, msg.user.namespace.name.asString, action, msg.revision, Instant.now, clientProxy)) + + case _ => + Future.failed(new IllegalStateException("lease does not exist")) + } + } + val env = authEnvironment ++ environment ++ Map( + // compute deadline on invoker side avoids discrepancies inside container + // but potentially under-estimates actual deadline + "deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson) + + container + .run( + parameters, + env.toJson.asJsObject, + actionTimeout, + action.limits.concurrency.maxConcurrent, + resumeRun.isDefined)(msg.transid) + .map { + case (runInterval, response) => + val initRunInterval = initInterval + .map(i => Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end)) + .getOrElse(runInterval) + constructWhiskActivation( + action, + msg, + initInterval, + initRunInterval, + runInterval.duration >= actionTimeout, + response) + } + } + .recoverWith { + case h: ContainerHealthError if resumeRun.isDefined => + // health error occurs + logging.error(this, s"caught healthchek check error while running activation") + Future.failed(ContainerHealthErrorWithResumedRun(h.tid, h.msg, resumeRun.get)) + + case InitializationError(interval, response) => + Future.successful( + constructWhiskActivation( + action, + msg, + Some(interval), + interval, + interval.duration >= actionTimeout, + response)) + + case t => + // Actually, this should never happen - but we want to make sure to not miss a problem + logging.error(this, s"caught unexpected error while running activation: $t") + Future.successful( + constructWhiskActivation( + action, + msg, + None, + Interval.zero, + false, + ExecutionResponse.whiskError(Messages.abnormalRun))) + } + + val splitAckMessagesPendingLogCollection = collectLogs.logsToBeCollected(action) + // Sending an active ack is an asynchronous operation. The result is forwarded as soon as + // possible for blocking activations so that dependent activations can be scheduled. The + // completion message which frees a load balancer slot is sent after the active ack future + // completes to ensure proper ordering. + val sendResult = if (msg.blocking) { + activation.map { result => + val ackMsg = + if (splitAckMessagesPendingLogCollection) ResultMessage(tid, result) + else CombinedCompletionAndResultMessage(tid, result, instance) + sendActiveAck(tid, result, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, ackMsg) + } + } else { + // For non-blocking request, do not forward the result. + if (splitAckMessagesPendingLogCollection) Future.successful(()) + else + activation.map { result => + val ackMsg = CompletionMessage(tid, result, instance) + sendActiveAck(tid, result, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, ackMsg) + } + } + + activation.foreach { activation => + val healthMessage = HealthMessage(!activation.response.isWhiskError) + invokerHealthManager ! healthMessage + } + + val context = UserContext(msg.user) + + // Adds logs to the raw activation. + val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation + .flatMap { activation => + // Skips log collection entirely, if the limit is set to 0 + if (action.limits.logs.asMegaBytes == 0.MB) { + Future.successful(Right(activation)) + } else { + val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS, logLevel = InfoLevel) + collectLogs(tid, msg.user, activation, container, action) + .andThen { + case Success(_) => tid.finished(this, start) + case Failure(t) => tid.failed(this, start, s"reading logs failed: $t") + } + .map(logs => Right(activation.withLogs(logs))) + .recover { + case LogCollectingException(logs) => + Left(ActivationLogReadingError(activation.withLogs(logs))) + case _ => + Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure))))) + } + } + } + + activationWithLogs + .map(_.fold(_.activation, identity)) + .foreach { activation => + // Sending the completion message to the controller after the active ack ensures proper ordering + // (result is received before the completion message for blocking invokes). + if (splitAckMessagesPendingLogCollection) { + sendResult.onComplete( + _ => + sendActiveAck( + tid, + activation, + msg.blocking, + msg.rootControllerIndex, + msg.user.namespace.uuid, + CompletionMessage(tid, activation, instance))) + } + + // Storing the record. Entirely asynchronous and not waited upon. + storeActivation(tid, activation, msg.blocking, context) + } + + // Disambiguate activation errors and transform the Either into a failed/successful Future respectively. + activationWithLogs + .andThen { + // remove activationId from runningActivations in any case + case _ => runningActivations.remove(msg.activationId.asString) + } + .flatMap { + case Right(act) if !act.response.isSuccess && !act.response.isApplicationError => + Future.failed(ActivationUnsuccessfulError(act)) + case Left(error) => Future.failed(error) + case Right(act) => Future.successful(act) + } + } + + /** Generates an activation with zero runtime. Usually used for error cases */ + private def generateFallbackActivation(action: ExecutableWhiskAction, + msg: ActivationMessage, + response: ExecutionResponse): WhiskActivation = { + val now = Instant.now + val causedBy = if (msg.causedBySequence) { + Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))) + } else None + + WhiskActivation( + activationId = msg.activationId, + namespace = msg.user.namespace.name.toPath, + subject = msg.user.subject, + cause = msg.cause, + name = msg.action.name, + version = msg.action.version.getOrElse(SemVer()), + start = now, + end = now, + duration = Some(0), + response = response, + annotations = { + Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.copy(version = None).asString)) ++ + Parameters(WhiskActivation.kindAnnotation, JsString(action.exec.kind)) ++ + causedBy + }) + } + } -// TODO -class FunctionPullingContainerProxy {} +object FunctionPullingContainerProxy { + + def props(factory: (TransactionId, + String, + ImageName, + Boolean, + ByteSize, + Int, + Option[ExecutableWhiskAction]) => Future[Container], + entityStore: ArtifactStore[WhiskEntity], + namespaceBlacklist: NamespaceBlacklist, + get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskAction], + dataManagementService: ActorRef, + clientProxyFactory: (ActorRefFactory, + String, + FullyQualifiedEntityName, + DocRevision, + String, + Int, + ContainerId) => ActorRef, + ack: ActiveAck, + store: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any], + collectLogs: LogsCollector, + getLiveContainerCount: (String, FullyQualifiedEntityName, DocRevision) => Future[Long], + getWarmedContainerLimit: (String) => Future[(Int, FiniteDuration)], + instance: InvokerInstanceId, + invokerHealthManager: ActorRef, + poolConfig: ContainerPoolConfig, + timeoutConfig: ContainerProxyTimeoutConfig, + healthCheckConfig: ContainerProxyHealthCheckConfig = + loadConfigOrThrow[ContainerProxyHealthCheckConfig](ConfigKeys.containerProxyHealth), + tcp: Option[ActorRef] = None)(implicit actorSystem: ActorSystem, mat: ActorMaterializer, logging: Logging) = + Props( + new FunctionPullingContainerProxy( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService, + clientProxyFactory, + ack, + store, + collectLogs, + getLiveContainerCount, + getWarmedContainerLimit, + instance, + invokerHealthManager, + poolConfig, + timeoutConfig, + healthCheckConfig, + tcp)) + + private val containerCount = new Counter + + /** + * Generates a unique container name. + * + * @param prefix the container name's prefix + * @param suffix the container name's suffix + * @return a unique container name + */ + def containerName(instance: InvokerInstanceId, prefix: String, suffix: String): String = { + def isAllowed(c: Char): Boolean = c.isLetterOrDigit || c == '_' + + val sanitizedPrefix = prefix.filter(isAllowed) + val sanitizedSuffix = suffix.filter(isAllowed) + + s"${ContainerFactory.containerNamePrefix(instance)}_${containerCount.next()}_${sanitizedPrefix}_${sanitizedSuffix}" + } + + /** + * Creates a WhiskActivation ready to be sent via active ack. + * + * @param job the job that was executed + * @param interval the time it took to execute the job + * @param response the response to return to the user + * @return a WhiskActivation to be sent to the user + */ + def constructWhiskActivation(action: ExecutableWhiskAction, + msg: ActivationMessage, + initInterval: Option[Interval], + totalInterval: Interval, + isTimeout: Boolean, + response: ExecutionResponse) = { + + val causedBy = if (msg.causedBySequence) { + Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))) + } else None + + val waitTime = { + val end = initInterval.map(_.start).getOrElse(totalInterval.start) + Parameters(WhiskActivation.waitTimeAnnotation, Interval(msg.transid.meta.start, end).duration.toMillis.toJson) + } + + val initTime = { + initInterval.map(initTime => Parameters(WhiskActivation.initTimeAnnotation, initTime.duration.toMillis.toJson)) + } + + val binding = + msg.action.binding.map(f => Parameters(WhiskActivation.bindingAnnotation, JsString(f.asString))) + + WhiskActivation( + activationId = msg.activationId, + namespace = msg.user.namespace.name.toPath, + subject = msg.user.subject, + cause = msg.cause, + name = action.name, + version = action.version, + start = totalInterval.start, + end = totalInterval.end, + duration = Some(totalInterval.duration.toMillis), + response = response, + annotations = { + Parameters(WhiskActivation.limitsAnnotation, action.limits.toJson) ++ + Parameters(WhiskActivation.pathAnnotation, JsString(action.fullyQualifiedName(false).asString)) ++ + Parameters(WhiskActivation.kindAnnotation, JsString(action.exec.kind)) ++ + Parameters(WhiskActivation.timeoutAnnotation, JsBoolean(isTimeout)) ++ + causedBy ++ initTime ++ waitTime ++ binding + }) + } + +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala new file mode 100644 index 00000000000..479739e23b0 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala @@ -0,0 +1,2920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.containerpool.v2.test + +import java.net.InetSocketAddress +import java.time.Instant +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.FSM.{CurrentState, StateTimeout, SubscribeTransitionCallBack, Transition} +import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props} +import akka.http.scaladsl.model +import akka.io.Tcp.Connect +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Sink, Source} +import akka.testkit.{ImplicitSender, TestKit, TestProbe} +import akka.util.ByteString +import com.ibm.etcd.api.{DeleteRangeResponse, KeyValue, PutResponse} +import com.ibm.etcd.client.{EtcdClient => Client} +import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction} +import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.core.ack.ActiveAck +import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage} +import org.apache.openwhisk.core.containerpool.logging.LogCollectingException +import org.apache.openwhisk.core.containerpool.v2._ +import org.apache.openwhisk.core.containerpool.{ + ContainerRemoved, + Paused => _, + Pausing => _, + Removing => _, + Running => _, + Start => _, + Uninitialized => _, + _ +} +import org.apache.openwhisk.core.database.{ArtifactStore, StaleParameter, UserContext} +import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} +import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.core.entity.types.AuthStore +import org.apache.openwhisk.core.entity.{ExecutableWhiskAction, _} +import org.apache.openwhisk.core.etcd.EtcdClient +import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys +import org.apache.openwhisk.core.etcd.EtcdType._ +import org.apache.openwhisk.core.invoker.{Invoker, NamespaceBlacklist} +import org.apache.openwhisk.core.service.{GetLease, Lease, RegisterData, UnregisterData} +import org.apache.openwhisk.http.Messages +import org.junit.runner.RunWith +import org.scalamock.scalatest.MockFactory +import org.scalatest.junit.JUnitRunner +import org.scalatest.{Assertion, BeforeAndAfterAll, FlatSpecLike, Matchers} +import spray.json.DefaultJsonProtocol._ +import spray.json.{JsObject, _} + +import scala.collection.mutable +import scala.collection.mutable.{Map => MutableMap} +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, Promise} +@RunWith(classOf[JUnitRunner]) +class FunctionPullingContainerProxyTests + extends TestKit(ActorSystem("FunctionPullingContainerProxy")) + with ImplicitSender + with FlatSpecLike + with Matchers + with MockFactory + with BeforeAndAfterAll + with StreamLogging { + + override def afterAll: Unit = { + client.close() + TestKit.shutdownActorSystem(system) + super.afterAll() + } + + implicit val mat = ActorMaterializer() + implicit val ece: ExecutionContextExecutor = system.dispatcher + + val timeout = 20.seconds + val longTimeout = timeout * 2 + + val log = logging + + val defaultUserMemory: ByteSize = 1024.MB + + val invocationNamespace = EntityName("invocationSpace") + + val schedulerHost = "127.17.0.1" + + val rpcPort = 13001 + + val neverMatchNamespace = EntityName("neverMatchNamespace") + + val uuid = UUID() + + val poolConfig = + ContainerPoolConfig( + 2.MB, + 0.5, + false, + FiniteDuration(2, TimeUnit.SECONDS), + FiniteDuration(1, TimeUnit.MINUTES), + None, + 100, + 3, + false, + 1.second) + + val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds) + + val messageTransId = TransactionId(TransactionId.testing.meta.id) + + val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None) + + val memoryLimit = 256.MB + + val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec) + + val fqn = FullyQualifiedEntityName(action.namespace, action.name, Some(action.version)) + + val message = ActivationMessage( + messageTransId, + action.fullyQualifiedName(true), + action.rev, + Identity(Subject(), Namespace(invocationNamespace, uuid), BasicAuthenticationAuthKey(uuid, Secret()), Set.empty), + ActivationId.generate(), + ControllerInstanceId("0"), + blocking = false, + content = None) + + val entityStore = WhiskEntityStore.datastore() + + val invokerHealthManager = TestProbe() + + val testContainerId = ContainerId("testcontainerId") + + val testLease = Lease(60, 10) + + def keepAliveService: ActorRef = + system.actorOf(Props(new Actor { + override def receive: Receive = { + case GetLease => + Thread.sleep(1000) + sender() ! testLease + } + })) + + def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2) + + val client: Client = { + val hostAndPorts = "172.17.0.1:2379" + Client.forEndpoints(hostAndPorts).withPlainText().build() + } + + class MockEtcdClient() extends EtcdClient(client)(ece) { + val etcdStore = MutableMap[String, String]() + var putFlag = false + override def put(key: String, value: String): Future[PutResponse] = { + etcdStore.update(key, value) + putFlag = true + Future.successful( + PutResponse.newBuilder().setPrevKv(KeyValue.newBuilder().setKey(key).setValue(value).build()).build()) + } + + override def del(key: String): Future[DeleteRangeResponse] = { + etcdStore.remove(key) + val res = DeleteRangeResponse.getDefaultInstance + Future.successful( + DeleteRangeResponse.newBuilder().setPrevKvs(0, KeyValue.newBuilder().setKey(key).build()).build()) + } + } + + val initInterval = { + val now = messageTransId.meta.start.plusMillis(50) // this is the queue time for cold start + Interval(now, now.plusMillis(100)) + } + + val runInterval = { + val now = initInterval.end.plusMillis(75) // delay between init and run + Interval(now, now.plusMillis(200)) + } + + val errorInterval = { + val now = initInterval.end.plusMillis(75) // delay between init and run + Interval(now, now.plusMillis(150)) + } + + /** Creates a client and a factory returning this ref of the client. */ + def testClient + : (TestProbe, + (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, String, Int, ContainerId) => ActorRef) = { + val client = TestProbe() + val factory = + (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: String, _: Int, _: ContainerId) => + client.ref + (client, factory) + } + + /** get WhiskAction*/ + def getWhiskAction(response: Future[WhiskAction]) = LoggedFunction { + (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) => + response + } + + /** Creates an inspectable factory */ + def createFactory(response: Future[Container]) = LoggedFunction { + (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: Int, _: Option[ExecutableWhiskAction]) => + response + } + + trait LoggedAcker extends ActiveAck { + def calls = + mutable.Buffer[(TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, AcknowledegmentMessage)]() + + def verifyAnnotations(activation: WhiskActivation, a: ExecutableWhiskAction) = { + activation.annotations.get("limits") shouldBe Some(a.limits.toJson) + activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson) + activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson) + } + } + + /** Creates an inspectable version of the ack method, which records all calls in a buffer */ + def createAcker(a: ExecutableWhiskAction = action) = new LoggedAcker { + val acker = LoggedFunction { + (_: TransactionId, _: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID, _: AcknowledegmentMessage) => + Future.successful(()) + } + + override def calls = acker.calls + + override def apply(tid: TransactionId, + activation: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: ControllerInstanceId, + userId: UUID, + acknowledegment: AcknowledegmentMessage): Future[Any] = { + verifyAnnotations(activation, a) + acker(tid, activation, blockingInvoke, controllerInstance, userId, acknowledegment) + } + } + + /** Creates an synchronized inspectable version of the ack method, which records all calls in a buffer */ + def createAckerForNamespaceBlacklist(a: ExecutableWhiskAction = action, + mockNamespaceBlacklist: MockNamespaceBlacklist) = new LoggedAcker { + val acker = SynchronizedLoggedFunction { + (_: TransactionId, _: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID, _: AcknowledegmentMessage) => + Future.successful(()) + } + + override def calls = acker.calls + + override def verifyAnnotations(activation: WhiskActivation, a: ExecutableWhiskAction): Assertion = { + activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson) + } + + override def apply(tid: TransactionId, + activation: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: ControllerInstanceId, + userId: UUID, + acknowledegment: AcknowledegmentMessage): Future[Any] = { + verifyAnnotations(activation, a) + acker(tid, activation, blockingInvoke, controllerInstance, userId, acknowledegment) + } + } + + /** Registers the transition callback and expects the first message */ + def registerCallback(c: ActorRef, probe: TestProbe) = { + c ! SubscribeTransitionCallBack(probe.ref) + probe.expectMsg(CurrentState(c, Uninitialized)) + } + + def createStore = LoggedFunction { + (transid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) => + Future.successful(()) + } + + def getLiveContainerCount(count: Long) = LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) => + Future.successful(count) + } + + def getWarmedContainerLimit(limit: Future[(Int, FiniteDuration)]) = LoggedFunction { (_: String) => + limit + } + + class LoggedCollector(response: Future[ActivationLogs], invokeCallback: () => Unit) extends Invoker.LogsCollector { + val collector = LoggedFunction { + (transid: TransactionId, + user: Identity, + activation: WhiskActivation, + container: Container, + action: ExecutableWhiskAction) => + response + } + + def calls = collector.calls + + override def apply(transid: TransactionId, + user: Identity, + activation: WhiskActivation, + container: Container, + action: ExecutableWhiskAction) = { + invokeCallback() + collector(transid, user, activation, container, action) + } + } + + def createCollector(response: Future[ActivationLogs] = Future.successful(ActivationLogs()), + invokeCallback: () => Unit = () => ()) = + new LoggedCollector(response, invokeCallback) + + /** Expect a NeedWork message with prewarmed data */ + def expectPreWarmed(kind: String, probe: TestProbe) = probe.expectMsgPF() { + case ReadyToWork(PreWarmData(_, kind, memoryLimit, _)) => true + } + + /** Expect a Initialized message with prewarmed data */ + def expectInitialized(probe: TestProbe) = probe.expectMsgPF() { + case Initialized(InitializedData(_, _, _, _)) => true + } + + /** Pre-warms the given state-machine, assumes good cases */ + def preWarm(machine: ActorRef, probe: TestProbe) = { + machine ! Start(exec, memoryLimit) + probe.expectMsg(Transition(machine, Uninitialized, CreatingContainer)) + expectPreWarmed(exec.kind, probe) + probe.expectMsg(Transition(machine, CreatingContainer, ContainerCreated)) + } + + behavior of "FunctionPullingContainerProxy" + + it should "create a prewarm container" in within(timeout) { + implicit val transid: TransactionId = messageTransId + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val probe = TestProbe() + + val (_, clientFactory) = testClient + + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + createAcker(), + store, + collector, + counter, + limit, + InvokerInstanceId(0, Some("myname"), userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + preWarm(machine, probe) + + factory.calls should have size 1 + val (tid, name, _, _, memory, _, _) = factory.calls(0) + tid shouldBe TransactionId.invokerWarmup + name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind""" + memory shouldBe memoryLimit + } + + it should "run actions to a started prewarm container with get activationMessage successfully" in within(timeout) { + implicit val transid: TransactionId = messageTransId + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + preWarm(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, transid) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 0 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 1 + } + } + + it should "run actions to a started prewarm container with get no activationMessage" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + preWarm(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, RetryRequestActivation) + client.expectMsg(RequestActivation()) + client.send(machine, StateTimeout) + client.send(machine, RetryRequestActivation) + + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing)) + client.expectMsg(StopClientProxy) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 0 + container.runCount shouldBe 0 + collector.calls.length shouldBe 0 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 0 + store.calls.length shouldBe 0 + } + } + + it should "run actions to a cold start container with get activationMessage successfully" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, Some("myname"), userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + val (tid, name, _, _, memory, _, _) = factory.calls(0) + tid shouldBe TransactionId.invokerColdstart + name should fullyMatch regex """wskmyname\d+_\d+_actionSpace_actionName""" + memory shouldBe memoryLimit + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 0 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 1 + } + } + + it should "run actions to a cold start container with get no activationMessage" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, RetryRequestActivation) + client.expectMsg(RequestActivation()) + client.send(machine, StateTimeout) + client.send(machine, RetryRequestActivation) + + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing)) + client.expectMsg(StopClientProxy) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 0 + container.runCount shouldBe 0 + collector.calls.length shouldBe 0 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 0 + store.calls.length shouldBe 0 + } + } + + it should "not run activations and destory the prewarm container when get client failed" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + + def clientFactory(f: ActorRefFactory, + invocationNamespace: String, + fqn: FullyQualifiedEntityName, + d: DocRevision, + schedulerHost: String, + rpcPort: Int, + c: ContainerId): ActorRef = { + throw new Exception("failed to create activation client") + } + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + preWarm(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + probe.expectMsg(ContainerRemoved(true)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 0 + container.runCount shouldBe 0 + collector.calls.length shouldBe 0 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 0 + store.calls.length shouldBe 0 + } + } + + it should "not run activations and don't create cold start container when get client failed" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + + def clientFactory(f: ActorRefFactory, + invocationNamespace: String, + fqn: FullyQualifiedEntityName, + d: DocRevision, + schedulerHost: String, + rpcPort: Int, + c: ContainerId): ActorRef = { + throw new Exception("failed to create activation client") + } + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + probe.expectMsg(ContainerRemoved(true)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 0 + container.runCount shouldBe 0 + collector.calls.length shouldBe 0 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 0 + store.calls.length shouldBe 0 + } + } + + it should "destroy container proxy when the client closed in CreatingClient" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientClosed) + + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, CreatingClient, Removing)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 0 + container.runCount shouldBe 0 + collector.calls.length shouldBe 0 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 0 + store.calls.length shouldBe 0 + } + } + + it should "destroy container proxy when the client closed in ContainerCreated" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, ClientClosed) + + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 0 + container.runCount shouldBe 0 + collector.calls.length shouldBe 0 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 0 + store.calls.length shouldBe 0 + } + } + + it should "destroy container proxy when the client closed in Running" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + client.send(machine, message) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + client.send(machine, ClientClosed) + + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 2 + collector.calls.length shouldBe 2 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 2 + store.calls.length shouldBe 2 + } + } + + it should "destroy container proxy when stopping due to timeout" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(2) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + probe watch machine + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + machine ! StateTimeout + client.send(machine, RetryRequestActivation) + probe.expectMsg(Transition(machine, Running, Pausing)) + probe.expectMsgType[ContainerIsPaused] + probe.expectMsg(Transition(machine, Pausing, Paused)) + + machine ! StateTimeout + client.expectMsg(StopClientProxy) + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing)) + client.send(machine, ClientClosed) + + probe expectTerminated machine + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 1 + } + } + + it should "destroy container proxy even if there is no message from the client when stopping due to timeout" in within( + timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(2) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + probe watch machine + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + machine ! StateTimeout + client.send(machine, RetryRequestActivation) + probe.expectMsg(Transition(machine, Running, Pausing)) + probe.expectMsgType[ContainerIsPaused] + probe.expectMsg(Transition(machine, Pausing, Paused)) + + client.send(machine, StateTimeout) + client.expectMsg(StopClientProxy) + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing)) + + probe expectTerminated (machine, timeout) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 1 + } + } + + it should "destroy container proxy even Even if there is 1 container, if timeout in keep" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + probe watch machine + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + machine ! StateTimeout + client.send(machine, RetryRequestActivation) + probe.expectMsg(Transition(machine, Running, Pausing)) + probe.expectMsgType[ContainerIsPaused] + probe.expectMsg(Transition(machine, Pausing, Paused)) + + client.send(machine, StateTimeout) + client.send(machine, v2.Remove) + client.expectMsg(StopClientProxy) + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing)) + + probe expectTerminated (machine, timeout) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 1 + } + } + + it should "Keep the container if live count is less than warmed container keeping count configuration" in within( + timeout) { + stream.reset() + val warmedContainerTimeout = 10.seconds + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((2, warmedContainerTimeout))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + probe watch machine + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + machine ! StateTimeout + client.send(machine, RetryRequestActivation) + probe.expectMsg(Transition(machine, Running, Pausing)) + probe.expectMsgType[ContainerIsPaused] + probe.expectMsg(Transition(machine, Pausing, Paused)) + + machine ! StateTimeout + probe.expectNoMessage(warmedContainerTimeout) + probe.expectMsgAllOf(warmedContainerTimeout, ContainerRemoved(true), Transition(machine, Paused, Removing)) + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 1 + } + } + + it should "Remove the container if live count is greater than warmed container keeping count configuration" in within( + timeout) { + stream.reset() + val warmedContainerTimeout = 10.seconds + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(2) + val limit = getWarmedContainerLimit(Future.successful((1, warmedContainerTimeout))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + probe watch machine + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + machine ! StateTimeout + client.send(machine, RetryRequestActivation) + probe.expectMsg(Transition(machine, Running, Pausing)) + probe.expectMsgType[ContainerIsPaused] + probe.expectMsg(Transition(machine, Pausing, Paused)) + + machine ! StateTimeout + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing)) + probe expectTerminated (machine, timeout) + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 1 + } + } + + it should "pause itself when timeout and recover when got a new Initialize" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + val instanceId = InvokerInstanceId(0, userMemory = defaultUserMemory) + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + instanceId, + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + probe watch machine + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + dataManagementService.expectMsg( + RegisterData( + ContainerKeys.existingContainers( + invocationNamespace.asString, + fqn, + DocRevision.empty, + Some(instanceId), + Some(testContainerId)), + "")) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + machine ! StateTimeout + client.send(machine, RetryRequestActivation) + probe.expectMsg(Transition(machine, Running, Pausing)) + probe.expectMsgType[ContainerIsPaused] + dataManagementService.expectMsgAllOf( + RegisterData( + ContainerKeys + .warmedContainers(invocationNamespace.asString, fqn, DocRevision.empty, instanceId, testContainerId), + ""), + UnregisterData( + ContainerKeys.existingContainers( + invocationNamespace.asString, + fqn, + DocRevision.empty, + Some(instanceId), + Some(testContainerId)))) + probe.expectMsg(Transition(machine, Pausing, Paused)) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + dataManagementService.expectMsgAllOf( + UnregisterData( + ContainerKeys + .warmedContainers(invocationNamespace.asString, fqn, DocRevision.empty, instanceId, testContainerId)), + RegisterData( + ContainerKeys.existingContainers( + invocationNamespace.asString, + fqn, + DocRevision.empty, + Some(instanceId), + Some(testContainerId)), + "")) + inAnyOrder { + probe.expectMsg(Transition(machine, Paused, Running)) + probe.expectMsgType[Resumed] + } + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 0 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 1 + } + + } + + it should "not collect logs if the log-limit is set to 0" in within(timeout) { + val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB))) + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(noLogsAction.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker(noLogsAction) + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, noLogsAction, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + client.send(machine, message) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount should be > 1 + collector.calls.length shouldBe 0 + container.destroyCount shouldBe 0 + acker.calls.length should be > 1 + store.calls.length should be > 1 + } + } + + it should "complete the transaction and abort if container creation fails" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val exception = new Exception() + val container = new TestContainer + val factory = createFactory(Future.failed(exception)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (_, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsgAllOf( + Transition(machine, Uninitialized, CreatingClient), + ContainerCreationFailed(exception), + ContainerRemoved(true)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 0 + container.runCount shouldBe 0 + collector.calls.length shouldBe 0 + container.destroyCount shouldBe 0 + acker.calls.length shouldBe 0 + store.calls.length shouldBe 0 + } + } + + it should "complete the transaction and destroy the prewarm container on a failed init" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val container = new TestContainer { + override def initialize(initializer: JsObject, + timeout: FiniteDuration, + maxConcurrent: Int, + entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = { + initializeCount += 1 + Future.failed(InitializationError(initInterval, ActivationResponse.developerError("boom"))) + } + } + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + preWarm(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 0 // should not run the action + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + val activation = acker.calls(0)._2 + activation.response shouldBe ActivationResponse.developerError("boom") + activation.annotations + .get(WhiskActivation.initTimeAnnotation) + .get + .convertTo[Int] shouldBe initInterval.duration.toMillis + store.calls.length shouldBe 1 + } + } + + it should "complete the transaction and destroy the cold start container on a failed init" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val container = new TestContainer { + override def initialize(initializer: JsObject, + timeout: FiniteDuration, + maxConcurrent: Int, + entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = { + initializeCount += 1 + Future.failed(InitializationError(initInterval, ActivationResponse.developerError("boom"))) + } + } + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 0 // should not run the action + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + val activation = acker.calls(0)._2 + activation.response shouldBe ActivationResponse.developerError("boom") + activation.annotations + .get(WhiskActivation.initTimeAnnotation) + .get + .convertTo[Int] shouldBe initInterval.duration.toMillis + store.calls.length shouldBe 1 + } + } + + it should "complete the transaction and destroy the container on a failed run IFF failure was containerError" in within( + timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val container = new TestContainer { + override def run(parameters: JsObject, + environment: JsObject, + timeout: FiniteDuration, + concurrent: Int, + reschedule: Boolean)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = { + atomicRunCount.incrementAndGet() + Future.successful((initInterval, ActivationResponse.developerError(("boom")))) + } + } + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + acker.calls(0)._2.response shouldBe ActivationResponse.developerError("boom") + store.calls.length shouldBe 1 + } + } + + it should "complete the transaction and reuse the container on a failed run IFF failure was applicationError" in within( + timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val container = new TestContainer { + override def run(parameters: JsObject, + environment: JsObject, + timeout: FiniteDuration, + concurrent: Int, + reschedule: Boolean)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = { + atomicRunCount.incrementAndGet() + //every other run fails + if (runCount % 2 == 0) { + Future.successful((runInterval, ActivationResponse.success())) + } else { + Future.successful((errorInterval, ActivationResponse.applicationError(("boom")))) + } + } + } + + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + client.send(machine, message) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount should be > 1 + collector.calls.length should be > 1 + container.destroyCount shouldBe 0 + acker.calls.length should be > 1 + store.calls.length should be > 1 + + val initErrorActivation = acker.calls(0)._2 + initErrorActivation.duration shouldBe Some((initInterval.duration + errorInterval.duration).toMillis) + initErrorActivation.annotations + .get(WhiskActivation.initTimeAnnotation) + .get + .convertTo[Int] shouldBe initInterval.duration.toMillis + initErrorActivation.annotations + .get(WhiskActivation.waitTimeAnnotation) + .get + .convertTo[Int] shouldBe + Interval(message.transid.meta.start, initInterval.start).duration.toMillis + + val runOnlyActivation = acker.calls(1)._2 + runOnlyActivation.duration shouldBe Some(runInterval.duration.toMillis) + runOnlyActivation.annotations.get(WhiskActivation.initTimeAnnotation) shouldBe empty + runOnlyActivation.annotations.get(WhiskActivation.waitTimeAnnotation).get.convertTo[Int] shouldBe { + Interval(message.transid.meta.start, runInterval.start).duration.toMillis + } + } + } + + it should "complete the transaction and destroy the container if log reading failed" in { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + + val partialLogs = Vector("this log line made it", Messages.logFailure) + val collector = createCollector(Future.failed(LogCollectingException(ActivationLogs(partialLogs)))) + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + acker.calls(0)._2.response shouldBe ActivationResponse.success() + store.calls.length shouldBe 1 + store.calls(0)._2.logs shouldBe ActivationLogs(partialLogs) + } + } + + it should "complete the transaction and destroy the container if log reading failed terminally" in { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector(Future.failed(new Exception)) + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + acker.calls(0)._2.response shouldBe ActivationResponse.success() + store.calls.length shouldBe 1 + store.calls(0)._2.logs shouldBe ActivationLogs(Vector(Messages.logFailure)) + } + } + + it should "save the container id to etcd when don't destroy the container" in within(timeout) { + implicit val transid: TransactionId = messageTransId + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val instance = InvokerInstanceId(0, userMemory = defaultUserMemory) + val dataManagementService = TestProbe() + + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + instance, + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 0 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 1 + dataManagementService.expectMsg(RegisterData( + s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, DocRevision.empty, Some(instance), Some(testContainerId))}", + "")) + } + } + + it should "save the container id to etcd first and delete it when destroy the container" in within(timeout) { + implicit val transid: TransactionId = messageTransId + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val instance = InvokerInstanceId(0, userMemory = defaultUserMemory) + val dataManagementService = TestProbe() + + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(2) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + instance, + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + dataManagementService.expectMsg(RegisterData( + s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, DocRevision.empty, Some(instance), Some(testContainerId))}", + "")) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + client.send(machine, StateTimeout) + client.send(machine, RetryRequestActivation) + + probe.expectMsg(Transition(machine, Running, Pausing)) + probe.expectMsgType[ContainerIsPaused] + probe.expectMsg(Transition(machine, Pausing, Paused)) + + client.send(machine, StateTimeout) + client.expectMsg(StopClientProxy) + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing)) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 1 + collector.calls.length shouldBe 1 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 1 + dataManagementService.expectMsg(UnregisterData( + s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, DocRevision.empty, Some(instance), Some(testContainerId))}")) + } + } + + it should "not destroy itself when time out happens but a new activation message comes" in within(timeout) { + implicit val transid: TransactionId = messageTransId + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val instance = InvokerInstanceId(0, userMemory = defaultUserMemory) + val dataManagementService = TestProbe() + + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + instance, + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + dataManagementService.expectMsg(RegisterData( + s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, DocRevision.empty, Some(instance), Some(testContainerId))}", + "")) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + client.send(machine, StateTimeout) // make container time out + client.send(machine, message) + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount shouldBe 2 + collector.calls.length shouldBe 2 + container.destroyCount shouldBe 0 + acker.calls.length shouldBe 2 + store.calls.length shouldBe 2 + dataManagementService.expectNoMessage() + } + } + + it should "get the latest NamespaceBlacklist when NamespaceBlacklist is updated in db" in within(timeout) { + stream.reset() + implicit val transid: TransactionId = messageTransId + val authStore = mock[ArtifactWhiskAuthStore] + val mockNamespaceBlacklist: MockNamespaceBlacklist = new MockNamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAckerForNamespaceBlacklist(mockNamespaceBlacklist = mockNamespaceBlacklist) + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + mockNamespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + preWarm(machine, probe) + + mockNamespaceBlacklist.refreshBlacklist() + //the namespace:invocationSpace will be added to namespaceBlackboxlist + mockNamespaceBlacklist.refreshBlacklist() + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing)) + + stream.toString should include(s"namespace invocationSpace was blocked in containerProxy") + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 0 + container.runCount shouldBe 0 + collector.calls.length shouldBe 0 + container.destroyCount shouldBe 1 + acker.calls.length shouldBe 1 + store.calls.length shouldBe 0 + } + } + + it should "block further invocations after invocation space is added in the namespace blacklist" in within(timeout) { + stream.reset() + implicit val transid: TransactionId = messageTransId + val authStore = mock[ArtifactWhiskAuthStore] + val mockNamespaceBlacklist: MockNamespaceBlacklist = new MockNamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val acker = createAckerForNamespaceBlacklist(mockNamespaceBlacklist = mockNamespaceBlacklist) + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + mockNamespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + + registerCallback(machine, probe) + preWarm(machine, probe) + + //first refresh, the namespace:invocationSpace is not in namespaceBlacklist, so activations are executed successfully + mockNamespaceBlacklist.refreshBlacklist() + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + stream.toString should include(s"namespace ${message.user.namespace.name} is not in the namespaceBlacklist") + + //refresh again, the namespace:invocationSpace will be added to namespaceBlacklist + stream.reset() + Thread.sleep(1000) + mockNamespaceBlacklist.refreshBlacklist() + client.expectMsg(ContainerWarmed) + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + client.send(machine, message) + + probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing)) + client.expectMsg(StopClientProxy) + stream.toString should include(s"namespace ${message.user.namespace.name} was blocked in containerProxy") + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount should be >= 1 + collector.calls.length should be >= 1 + container.destroyCount shouldBe 1 + acker.calls.length should be >= 1 + store.calls.length should be >= 1 + } + } + + it should "not timeout when running long time action" in within(longTimeout) { + implicit val transid: TransactionId = messageTransId + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val container = new TestContainer { + override def run(parameters: JsObject, + environment: JsObject, + timeout: FiniteDuration, + concurrent: Int, + reschedule: Boolean)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = { + Thread.sleep((timeoutConfig.pauseGrace + 1.second).toMillis) // 6 sec actions + super.run(parameters, environment, timeout, concurrent) + } + } + val factory = createFactory(Future.successful(container)) + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + preWarm(machine, probe) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + client.expectMsgPF(8.seconds) { // wait more than container running time(6 seconds) + case RequestActivation(Some(_), None) => true + } + client.send(machine, message) + client.expectMsgPF(8.seconds) { // wait more than container running time(6 seconds) + case RequestActivation(Some(_), None) => true + } + + awaitAssert { + factory.calls should have size 1 + container.initializeCount shouldBe 1 + container.runCount should be > 1 + collector.calls.length should be > 1 + container.destroyCount shouldBe 0 + acker.calls.length should be > 1 + store.calls.length should be > 1 + } + } + + it should "start tcp ping to containers when action healthcheck enabled" in within(timeout) { + implicit val transid: TransactionId = messageTransId + + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + val container = new TestContainer + val factory = createFactory(Future.successful(container)) + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + val healthchecks = healthchecksConfig(true) + + val probe = TestProbe() + val tcpProbe = TestProbe() + + val (client, clientFactory) = testClient + + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + createAcker(), + store, + collector, + counter, + limit, + InvokerInstanceId(0, Some("myname"), userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig, + healthchecks, + tcp = Some(tcpProbe.ref))) + registerCallback(machine, probe) + preWarm(machine, probe) + + tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 12345))) + tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 12345))) + tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 12345))) + + //pings should repeat till the container goes into Running state + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, transid) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + // Receive any unhandled messages + tcpProbe.receiveWhile(3.seconds, 200.milliseconds, 10) { + case Connect(_, None, Nil, None, false) => + true + } + + tcpProbe.expectNoMessage(healthchecks.checkPeriod + 100.milliseconds) + + awaitAssert { + factory.calls should have size 1 + } + } + + it should "reschedule the job to the queue if /init fails connection on ClientCreated" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + + // test container + val initPromise = Promise[Interval]() + val container = new TestContainer(initPromise = Some(initPromise)) + val factory = createFactory(Future.successful(container)) + + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + preWarm(machine, probe) + + // throw health error + initPromise.failure(ContainerHealthError(messageTransId, "intentional failure")) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + // message should be rescheduled + val fqn = action.fullyQualifiedName(withVersion = true) + val rev = action.rev + client.expectMsg(RescheduleActivation(invocationNamespace.asString, fqn, rev, message)) + } + + it should "reschedule the job to the queue if /run fails connection on ClientCreated" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + + // test container + val runPromises = Seq(Promise[(Interval, ActivationResponse)](), Promise[(Interval, ActivationResponse)]()) + val container = new TestContainer(runPromises = runPromises) + val factory = createFactory(Future.successful(container)) + + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + preWarm(machine, probe) + + // throw health error + runPromises.head.failure(ContainerHealthError(messageTransId, "intentional failure")) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + // message should be rescheduled + val fqn = action.fullyQualifiedName(withVersion = true) + val rev = action.rev + client.expectMsg(ContainerWarmed) + client.expectMsg(RescheduleActivation(invocationNamespace.asString, fqn, rev, message)) + } + + it should "reschedule the job to the queue if /run fails connection on Running" in within(timeout) { + val authStore = mock[ArtifactWhiskAuthStore] + val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + val get = getWhiskAction(Future(action.toWhiskAction)) + val dataManagementService = TestProbe() + + val acker = createAcker() + val store = createStore + val collector = createCollector() + val counter = getLiveContainerCount(1) + val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) + + // test container + val runPromises = Seq(Promise[(Interval, ActivationResponse)](), Promise[(Interval, ActivationResponse)]()) + val container = new TestContainer(runPromises = runPromises) + val factory = createFactory(Future.successful(container)) + + val (client, clientFactory) = testClient + + val probe = TestProbe() + val machine = + probe.childActorOf( + FunctionPullingContainerProxy + .props( + factory, + entityStore, + namespaceBlacklist, + get, + dataManagementService.ref, + clientFactory, + acker, + store, + collector, + counter, + limit, + InvokerInstanceId(0, userMemory = defaultUserMemory), + invokerHealthManager.ref, + poolConfig, + timeoutConfig)) + registerCallback(machine, probe) + preWarm(machine, probe) + + // pass first request to become Running state + runPromises(0).success(runInterval, ActivationResponse.success()) + + // throw health error + runPromises(1).failure(ContainerHealthError(messageTransId, "intentional failure")) + + machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId) + probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient)) + client.expectMsg(StartClient) + client.send(machine, ClientCreationCompleted()) + + probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) + expectInitialized(probe) + client.expectMsg(RequestActivation()) + client.send(machine, message) + + probe.expectMsg(Transition(machine, ClientCreated, Running)) + client.expectMsg(ContainerWarmed) + + client.expectMsgPF() { + case RequestActivation(Some(_), None) => true + } + client.send(machine, message) + + // message should be rescheduled + val fqn = action.fullyQualifiedName(withVersion = true) + val rev = action.rev + client.expectMsg(RescheduleActivation(invocationNamespace.asString, fqn, rev, message)) + } + + /** + * Implements all the good cases of a perfect run to facilitate error case overriding. + */ + class TestContainer(initPromise: Option[Promise[Interval]] = None, + runPromises: Seq[Promise[(Interval, ActivationResponse)]] = Seq.empty, + apiKeyMustBePresent: Boolean = true) + extends Container { + protected val id = testContainerId + protected[core] val addr = ContainerAddress("0.0.0.0", 12345) + protected implicit val logging: Logging = log + protected implicit val ec: ExecutionContext = system.dispatcher + override implicit protected val as: ActorSystem = system + var destroyCount = 0 + var initializeCount = 0 + val atomicRunCount = new AtomicInteger(0) //need atomic tracking since we will test concurrent runs + var atomicLogsCount = new AtomicInteger(0) + + def runCount = atomicRunCount.get() + + override def destroy()(implicit transid: TransactionId): Future[Unit] = { + destroyCount += 1 + super.destroy() + } + override def initialize(initializer: JsObject, + timeout: FiniteDuration, + maxConcurrent: Int, + entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = { + initializeCount += 1 + val envField = "env" + + (initializer.fields - envField) shouldBe action.containerInitializer().fields - envField + + timeout shouldBe action.limits.timeout.duration + + val initializeEnv = initializer.fields(envField).asJsObject + + initializeEnv.fields("__OW_NAMESPACE") shouldBe invocationNamespace.name.toJson + initializeEnv.fields("__OW_ACTION_NAME") shouldBe message.action.qualifiedNameWithLeadingSlash.toJson + initializeEnv.fields("__OW_ACTION_VERSION") shouldBe message.action.version.toJson + initializeEnv.fields("__OW_ACTIVATION_ID") shouldBe message.activationId.toJson + initializeEnv.fields("__OW_TRANSACTION_ID") shouldBe transid.id.toJson + + val convertedAuthKey = message.user.authkey.toEnvironment.fields.map(f => ("__OW_" + f._1.toUpperCase(), f._2)) + val authEnvironment = initializeEnv.fields.filterKeys(convertedAuthKey.contains) + convertedAuthKey shouldBe authEnvironment + + val deadline = Instant.ofEpochMilli(initializeEnv.fields("__OW_DEADLINE").convertTo[String].toLong) + val maxDeadline = Instant.now.plusMillis(timeout.toMillis) + + // The deadline should be in the future but must be smaller than or equal + // a freshly computed deadline, as they get computed slightly after each other + deadline should (be <= maxDeadline and be >= Instant.now) + + initPromise.map(_.future).getOrElse(Future.successful(initInterval)) + } + + override def run( + parameters: JsObject, + environment: JsObject, + timeout: FiniteDuration, + concurrent: Int, + reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = { + val runCount = atomicRunCount.incrementAndGet() + environment.fields("namespace") shouldBe invocationNamespace.name.toJson + environment.fields("action_name") shouldBe message.action.qualifiedNameWithLeadingSlash.toJson + environment.fields("action_version") shouldBe message.action.version.toJson + environment.fields("activation_id") shouldBe message.activationId.toJson + environment.fields("transaction_id") shouldBe transid.id.toJson + val authEnvironment = environment.fields.filterKeys(message.user.authkey.toEnvironment.fields.contains).toMap + message.user.authkey.toEnvironment shouldBe authEnvironment.toJson.asJsObject + val deadline = Instant.ofEpochMilli(environment.fields("deadline").convertTo[String].toLong) + val maxDeadline = Instant.now.plusMillis(timeout.toMillis) + + // The deadline should be in the future but must be smaller than or equal + // a freshly computed deadline, as they get computed slightly after each other + deadline should (be <= maxDeadline and be >= Instant.now) + + //return the future for this run (if runPromises no empty), or a default response + runPromises + .lift(runCount - 1) + .map(_.future) + .getOrElse(Future.successful((runInterval, ActivationResponse.success()))) + } + + def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = { + atomicLogsCount.incrementAndGet() + Source.empty + } + } + + abstract class ArtifactWhiskAuthStore extends ArtifactStore[WhiskAuth] { + override protected[core] implicit val executionContext: ExecutionContext = ece + override implicit val logging: Logging = log + + override protected[core] def put(d: WhiskAuth)(implicit transid: TransactionId): Future[DocInfo] = ??? + + override protected[core] def del(doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = ??? + + override protected[core] def get[A <: WhiskAuth](doc: DocInfo, + attachmentHandler: Option[(A, Attachments.Attached) => A])( + implicit transid: TransactionId, + ma: Manifest[A]): Future[A] = ??? + + override protected[core] def query(table: String, + startKey: List[Any], + endKey: List[Any], + skip: Int, + limit: Int, + includeDocs: Boolean, + descending: Boolean, + reduce: Boolean, + stale: StaleParameter)(implicit transid: TransactionId): Future[List[JsObject]] = + ??? + + override protected[core] def count(table: String, + startKey: List[Any], + endKey: List[Any], + skip: Int, + stale: StaleParameter)(implicit transid: TransactionId): Future[Long] = ??? + + override protected[core] def putAndAttach[A <: WhiskAuth](d: A, + update: (A, Attachments.Attached) => A, + contentType: model.ContentType, + docStream: Source[ByteString, _], + oldAttachment: Option[Attachments.Attached])( + implicit transid: TransactionId): Future[(DocInfo, Attachments.Attached)] = ??? + + override protected[core] def readAttachment[T]( + doc: DocInfo, + attached: Attachments.Attached, + sink: Sink[ByteString, Future[T]])(implicit transid: TransactionId): Future[T] = ??? + + override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = + ??? + + override def shutdown(): Unit = ??? + } + + class MockNamespaceBlacklist(authStore: AuthStore) extends NamespaceBlacklist(authStore) { + + var count = 0 + var blacklist: Set[String] = Set.empty + + override def isBlacklisted(identity: Identity): Boolean = { + blacklist.contains(identity.namespace.name.asString) + } + + override def refreshBlacklist()(implicit ec: ExecutionContext, tid: TransactionId): Future[Set[String]] = { + count += 1 + if (count == 1) { + //neverMatchNamespace is in the namespaceBlacklist + blacklist = Set(neverMatchNamespace.name) + Future.successful(blacklist) + } else { + //invocationNamespace is not in the namespaceBlacklist + blacklist = Set(invocationNamespace.name) + Future.successful(blacklist) + } + } + } +}