diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala index 4b66e99cc55..724cd5971ed 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala @@ -129,44 +129,40 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, val memory = r.action.limits.memory.megabytes.MB val createdContainer = - // Is there enough space on the invoker for this action to be executed. - if (hasPoolSpaceFor(busyPool ++ prewarmedPool, memory)) { - // Schedule a job to a warm container - ContainerPool - .schedule(r.action, r.msg.user.namespace.name, freePool) - .map(container => (container, container._2.initingState)) //warmed, warming, and warmingCold always know their state - .orElse( - // There was no warm/warming/warmingCold container. Try to take a prewarm container or a cold container. - - // Is there enough space to create a new container or do other containers have to be removed? - if (hasPoolSpaceFor(busyPool ++ freePool ++ prewarmedPool, memory)) { + // Schedule a job to a warm container + ContainerPool + .schedule(r.action, r.msg.user.namespace.name, freePool) + .map(container => (container, container._2.initingState)) //warmed, warming, and warmingCold always know their state + .orElse( + // There was no warm/warming/warmingCold container. Try to take a prewarm container or a cold container. + // When take prewarm container, has no need to judge whether user memory is enough + takePrewarmContainer(r.action) + .map(container => (container, "prewarmed")) + .orElse { + // Is there enough space to create a new container or do other containers have to be removed? + if (hasPoolSpaceFor(busyPool ++ freePool ++ prewarmedPool, prewarmStartingPool, memory)) { + val container = Some(createContainer(memory), "cold") + incrementColdStartCount(kind, memory) + container + } else None + }) + .orElse( + // Remove a container and create a new one for the given job + ContainerPool + // Only free up the amount, that is really needed to free up + .remove(freePool, Math.min(r.action.limits.memory.megabytes, memoryConsumptionOf(freePool)).MB) + .map(removeContainer) + // If the list had at least one entry, enough containers were removed to start the new container. After + // removing the containers, we are not interested anymore in the containers that have been removed. + .headOption + .map(_ => takePrewarmContainer(r.action) - .map(container => (container, "prewarmed")) - .orElse { - val container = Some(createContainer(memory), "cold") + .map(container => (container, "recreatedPrewarm")) + .getOrElse { + val container = (createContainer(memory), "recreated") incrementColdStartCount(kind, memory) container - } - } else None) - .orElse( - // Remove a container and create a new one for the given job - ContainerPool - // Only free up the amount, that is really needed to free up - .remove(freePool, Math.min(r.action.limits.memory.megabytes, memoryConsumptionOf(freePool)).MB) - .map(removeContainer) - // If the list had at least one entry, enough containers were removed to start the new container. After - // removing the containers, we are not interested anymore in the containers that have been removed. - .headOption - .map(_ => - takePrewarmContainer(r.action) - .map(container => (container, "recreatedPrewarm")) - .getOrElse { - val container = (createContainer(memory), "recreated") - incrementColdStartCount(kind, memory) - container - })) - - } else None + })) createdContainer match { case Some(((actor, data), containerState)) => @@ -371,9 +367,15 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, /** Creates a new prewarmed container */ def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize, ttl: Option[FiniteDuration]): Unit = { - val newContainer = childFactory(context) - prewarmStartingPool = prewarmStartingPool + (newContainer -> (exec.kind, memoryLimit)) - newContainer ! Start(exec, memoryLimit, ttl) + if (hasPoolSpaceFor(busyPool ++ freePool ++ prewarmedPool, prewarmStartingPool, memoryLimit)) { + val newContainer = childFactory(context) + prewarmStartingPool = prewarmStartingPool + (newContainer -> (exec.kind, memoryLimit)) + newContainer ! Start(exec, memoryLimit, ttl) + } else { + logging.warn( + this, + s"Cannot create prewarm container due to reach the invoker memory limit: ${poolConfig.userMemory.toMB}") + } } /** this is only for cold start statistics of prewarm configs, e.g. not blackbox or other configs. */ @@ -439,8 +441,10 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, * @param memory The amount of memory to check. * @return true, if there is enough space for the given amount of memory. */ - def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = { - memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB + def hasPoolSpaceFor[A](pool: Map[A, ContainerData], + prewarmStartingPool: Map[A, (String, ByteSize)], + memory: ByteSize): Boolean = { + memoryConsumptionOf(pool) + prewarmStartingPool.map(_._2._2.toMB).sum + memory.toMB <= poolConfig.userMemory.toMB } /** diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala index 3fd04143c06..bc103507a24 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala @@ -310,6 +310,19 @@ class ContainerPoolTests feed.expectMsg(MessageFeed.Processed) } + it should "not create prewarm container when used memory reaches the limit" in within(timeout) { + val (containers, factory) = testContainers(2) + val feed = TestProbe() + + val pool = + system.actorOf(ContainerPool + .props(factory, poolConfig(MemoryLimit.STD_MEMORY * 1), feed.ref, List(PrewarmingConfig(2, exec, memoryLimit)))) + containers(0).expectMsg(Start(exec, memoryLimit)) + containers(0).send(pool, NeedWork(preWarmedData(exec.kind))) + + containers(1).expectNoMessage(100.milliseconds) + } + /* * CONTAINER PREWARMING */ @@ -320,7 +333,7 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(factory, poolConfig(0.MB), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) + .props(factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) } @@ -829,7 +842,7 @@ class ContainerPoolTests stream.reset() val prewarmExpirationCheckIntervel = 2.seconds val poolConfig = - ContainerPoolConfig(MemoryLimit.STD_MEMORY * 8, 0.5, false, prewarmExpirationCheckIntervel, None, 100) + ContainerPoolConfig(MemoryLimit.STD_MEMORY * 12, 0.5, false, prewarmExpirationCheckIntervel, None, 100) val minCount = 0 val initialCount = 2 val maxCount = 4