diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 072e4aebb15..7b80ec6a0ed 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -209,6 +209,7 @@ invoker: heap: "{{ invoker_heap | default('2g') }}" arguments: "{{ invoker_arguments | default('') }}" userMemory: "{{ invoker_user_memory | default('2048m') }}" + userCpus: "{{ invoker_user_cpus | default() }}" # Specify if it is allowed to deploy more than 1 invoker on a single machine. allowMultipleInstances: "{{ invoker_allow_multiple_instances | default(false) }}" # Specify if it should use docker-runc or docker to pause/unpause containers diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index a2089052c86..fba7bf94bd1 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -258,6 +258,7 @@ "CONFIG_whisk_containerFactory_containerArgs_network": "{{ invoker_container_network_name | default('bridge') }}" "INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}" "CONFIG_whisk_containerPool_userMemory": "{{ hostvars[groups['invokers'][invoker_index | int]].user_memory | default(invoker.userMemory) }}" + "CONFIG_whisk_containerPool_userCpus": "{{ invoker.userCpus | default() }}" "CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}" "CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc | default(false) | lower }}" "WHISK_LOGS_DIR": "{{ whisk_logs_dir }}" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala index e2cba13316d..a7272d0a2f6 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala @@ -25,7 +25,7 @@ import org.apache.openwhisk.spi.Spi import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration -import scala.math.max +import scala.math.{max, round} case class ContainerArgsConfig(network: String, dnsServers: Seq[String] = Seq.empty, @@ -55,6 +55,7 @@ case class ContainerPoolConfig(userMemory: ByteSize, prewarmPromotion: Boolean, memorySyncInterval: FiniteDuration, batchDeletionSize: Int, + userCpus: Option[Double] = None, prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) { require( concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0, @@ -62,6 +63,7 @@ case class ContainerPoolConfig(userMemory: ByteSize, require(prewarmExpirationCheckInterval.toSeconds > 0, "prewarmExpirationCheckInterval must be > 0") require(batchDeletionSize > 0, "batch deletion size must be > 0") + require(userCpus.forall(_ > 0), "userCpus must be > 0") /** * The shareFactor indicates the number of containers that would share a single core, on average. @@ -73,6 +75,16 @@ case class ContainerPoolConfig(userMemory: ByteSize, // Grant more CPU to a container if it allocates more memory. def cpuShare(reservedMemory: ByteSize) = max((totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt, 2) // The minimum allowed cpu-shares is 2 + + private val minContainerCpus = 0.01 // The minimum cpus allowed by docker is 0.01 + private val roundingMultiplier = 100000 + def cpuLimit(reservedMemory: ByteSize): Option[Double] = { + userCpus.map(c => { + val containerCpus = c / (userMemory.toBytes / reservedMemory.toBytes) + val roundedContainerCpus = round(containerCpus * roundingMultiplier).toDouble / roundingMultiplier // Only use decimal precision of 5 + max(roundedContainerCpus, minContainerCpus) + }) + } } case class PrewarmContainerCreationConfig(maxConcurrent: Int, creationDelay: FiniteDuration) { @@ -116,8 +128,9 @@ trait ContainerFactory { userProvidedImage: Boolean, memory: ByteSize, cpuShares: Int, + cpuLimit: Option[Double], action: Option[ExecutableWhiskAction])(implicit config: WhiskConfig, logging: Logging): Future[Container] = { - createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares) + createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares, cpuLimit) } def createContainer(tid: TransactionId, @@ -125,7 +138,8 @@ trait ContainerFactory { actionImage: ExecManifest.ImageName, userProvidedImage: Boolean, memory: ByteSize, - cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] + cpuShares: Int, + cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] /** perform any initialization */ def init(): Unit diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala index e8fc86b12ce..d27df81096d 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala @@ -123,7 +123,8 @@ class YARNContainerFactory(actorSystem: ActorSystem, actionImage: ExecManifest.ImageName, unuseduserProvidedImage: Boolean, unusedmemory: ByteSize, - unusedcpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { + unusedcpuShares: Int, + unusedcpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = { implicit val timeout: Timeout = Timeout(containerStartTimeoutMS.milliseconds) //First send the create command to YARN, then with a different actor, wait for the container to be ready diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index 6fca2210f3d..f04dfdbac52 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -71,6 +71,8 @@ whisk { prewarm-promotion: false # if true, action can take prewarm container which has bigger memory memory-sync-interval: 1 second # period to sync memory info to etcd batch-deletion-size: 10 # batch size for removing containers when disable invoker, too big value may cause docker/k8s overload + # optional setting to specify the total allocatable cpus for all action containers, each container will get a fraction of this proportional to its allocated memory to limit the cpu + # user-cpus: 1 } kubernetes { diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala index b5548b3b625..8261d49da2f 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala @@ -250,6 +250,7 @@ class ContainerProxy(factory: (TransactionId, Boolean, ByteSize, Int, + Option[Double], Option[ExecutableWhiskAction]) => Future[Container], sendActiveAck: ActiveAck, storeActivation: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any], @@ -288,6 +289,7 @@ class ContainerProxy(factory: (TransactionId, job.exec.pull, job.memoryLimit, poolConfig.cpuShare(job.memoryLimit), + poolConfig.cpuLimit(job.memoryLimit), None) .map(container => PreWarmCompleted(PreWarmedData(container, job.exec.kind, job.memoryLimit, expires = job.ttl.map(_.fromNow)))) @@ -307,6 +309,7 @@ class ContainerProxy(factory: (TransactionId, job.action.exec.pull, job.action.limits.memory.megabytes.MB, poolConfig.cpuShare(job.action.limits.memory.megabytes.MB), + poolConfig.cpuLimit(job.action.limits.memory.megabytes.MB), Some(job.action)) // container factory will either yield a new container ready to execute the action, or @@ -978,6 +981,7 @@ object ContainerProxy { Boolean, ByteSize, Int, + Option[Double], Option[ExecutableWhiskAction]) => Future[Container], ack: ActiveAck, store: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any], diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala index d624ba30a8c..378000b52ca 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala @@ -63,6 +63,7 @@ object DockerContainer { registryConfig: Option[RuntimesRegistryConfig] = None, memory: ByteSize = 256.MB, cpuShares: Int = 0, + cpuLimit: Option[Double] = None, environment: Map[String, String] = Map.empty, network: String = "bridge", dnsServers: Seq[String] = Seq.empty, @@ -101,6 +102,7 @@ object DockerContainer { dnsSearch.flatMap(d => Seq("--dns-search", d)) ++ dnsOptions.flatMap(d => Seq(dnsOptString, d)) ++ name.map(n => Seq("--name", n)).getOrElse(Seq.empty) ++ + cpuLimit.map(c => Seq("--cpus", c.toString)).getOrElse(Seq.empty) ++ params val registryConfigUrl = registryConfig.map(_.url).getOrElse("") diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala index b5247918643..c4e1f8effb7 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala @@ -57,12 +57,14 @@ class DockerContainerFactory(instance: InvokerInstanceId, extends ContainerFactory { /** Create a container using docker cli */ - override def createContainer(tid: TransactionId, - name: String, - actionImage: ExecManifest.ImageName, - userProvidedImage: Boolean, - memory: ByteSize, - cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { + override def createContainer( + tid: TransactionId, + name: String, + actionImage: ExecManifest.ImageName, + userProvidedImage: Boolean, + memory: ByteSize, + cpuShares: Int, + cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = { val registryConfig = ContainerFactory.resolveRegistryConfig(userProvidedImage, runtimesRegistryConfig, userImagesRegistryConfig) val image = if (userProvidedImage) Left(actionImage) else Right(actionImage) @@ -72,6 +74,7 @@ class DockerContainerFactory(instance: InvokerInstanceId, registryConfig = Some(registryConfig), memory = memory, cpuShares = cpuShares, + cpuLimit = cpuLimit, environment = Map("__OW_API_HOST" -> config.wskApiHost) ++ containerArgsConfig.extraEnvVarMap, network = containerArgsConfig.network, dnsServers = containerArgsConfig.dnsServers, diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala index b3102ccddcf..0251da7fe52 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala @@ -61,12 +61,14 @@ class StandaloneDockerContainerFactory(instance: InvokerInstanceId, parameters: private val pulledImages = new TrieMap[String, Boolean]() private val factoryConfig = loadConfigOrThrow[StandaloneDockerConfig](ConfigKeys.standaloneDockerContainerFactory) - override def createContainer(tid: TransactionId, - name: String, - actionImage: ExecManifest.ImageName, - userProvidedImage: Boolean, - memory: ByteSize, - cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { + override def createContainer( + tid: TransactionId, + name: String, + actionImage: ExecManifest.ImageName, + userProvidedImage: Boolean, + memory: ByteSize, + cpuShares: Int, + cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = { //For standalone server usage we would also want to pull the OpenWhisk provided image so as to ensure if //local setup does not have the image then it pulls it down @@ -84,7 +86,7 @@ class StandaloneDockerContainerFactory(instance: InvokerInstanceId, parameters: } } else Future.successful(true) - pulled.flatMap(_ => super.createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares)) + pulled.flatMap(_ => super.createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares, cpuLimit)) } override def init(): Unit = { diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala index 8292eba1eac..d627627d502 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala @@ -67,12 +67,14 @@ class KubernetesContainerFactory( Await.ready(cleaning, KubernetesContainerFactoryProvider.runtimeDeleteTimeout) } - override def createContainer(tid: TransactionId, - name: String, - actionImage: ImageName, - userProvidedImage: Boolean, - memory: ByteSize, - cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { + override def createContainer( + tid: TransactionId, + name: String, + actionImage: ImageName, + userProvidedImage: Boolean, + memory: ByteSize, + cpuShares: Int, + cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = { val image = actionImage.resolveImageName(Some( ContainerFactory.resolveRegistryConfig(userProvidedImage, runtimesRegistryConfig, userImagesRegistryConfig).url)) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala index c5fa5a6f2e9..b0fa73f35f3 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala @@ -185,6 +185,7 @@ class FunctionPullingContainerProxy( Boolean, ByteSize, Int, + Option[Double], Option[ExecutableWhiskAction]) => Future[Container], entityStore: ArtifactStore[WhiskEntity], namespaceBlacklist: NamespaceBlacklist, @@ -240,6 +241,7 @@ class FunctionPullingContainerProxy( job.exec.pull, job.memoryLimit, poolConfig.cpuShare(job.memoryLimit), + poolConfig.cpuLimit(job.memoryLimit), None) .map(container => PreWarmData(container, job.exec.kind, job.memoryLimit, expires = job.ttl.map(_.fromNow))) .pipeTo(self) @@ -254,6 +256,7 @@ class FunctionPullingContainerProxy( job.action.exec.pull, job.action.limits.memory.megabytes.MB, poolConfig.cpuShare(job.action.limits.memory.megabytes.MB), + poolConfig.cpuLimit(job.action.limits.memory.megabytes.MB), None) .andThen { case Failure(t) => @@ -1274,6 +1277,7 @@ object FunctionPullingContainerProxy { Boolean, ByteSize, Int, + Option[Double], Option[ExecutableWhiskAction]) => Future[Container], entityStore: ArtifactStore[WhiskEntity], namespaceBlacklist: NamespaceBlacklist, diff --git a/tests/dat/actions/zippedaction/package-lock.json b/tests/dat/actions/zippedaction/package-lock.json index 4bbddd42a9c..8f602b8f83e 100644 --- a/tests/dat/actions/zippedaction/package-lock.json +++ b/tests/dat/actions/zippedaction/package-lock.json @@ -1,18 +1,26 @@ { "name": "test-action", "version": "1.0.0", - "lockfileVersion": 1, + "lockfileVersion": 3, "requires": true, - "dependencies": { - "prog-quote": { + "packages": { + "": { + "name": "test-action", + "version": "1.0.0", + "license": "Apache 2.0", + "dependencies": { + "prog-quote": "2.0.0" + } + }, + "node_modules/prog-quote": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/prog-quote/-/prog-quote-2.0.0.tgz", "integrity": "sha1-TLBMeosV/zu/kxMQxCsBzSjcMB0=", - "requires": { + "dependencies": { "random-js": "1.0.8" } }, - "random-js": { + "node_modules/random-js": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/random-js/-/random-js-1.0.8.tgz", "integrity": "sha1-lo/WiabyXWwKrHZig94vaIycGQo=" diff --git a/tests/src/test/scala/common/LoggedFunction.scala b/tests/src/test/scala/common/LoggedFunction.scala index b5bd8960de1..65b4effad55 100644 --- a/tests/src/test/scala/common/LoggedFunction.scala +++ b/tests/src/test/scala/common/LoggedFunction.scala @@ -92,6 +92,16 @@ class LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body: (A1, A2, A3, A4, A5, } } +class LoggedFunction8[A1, A2, A3, A4, A5, A6, A7, A8, B](body: (A1, A2, A3, A4, A5, A6, A7, A8) => B) + extends Function8[A1, A2, A3, A4, A5, A6, A7, A8, B] { + val calls = mutable.Buffer[(A1, A2, A3, A4, A5, A6, A7, A8)]() + + override def apply(v1: A1, v2: A2, v3: A3, v4: A4, v5: A5, v6: A6, v7: A7, v8: A8): B = { + calls += ((v1, v2, v3, v4, v5, v6, v7, v8)) + body(v1, v2, v3, v4, v5, v6, v7, v8) + } +} + class SynchronizedLoggedFunction1[A1, B](body: A1 => B) extends Function1[A1, B] { val calls = mutable.Buffer[A1]() @@ -157,6 +167,8 @@ object LoggedFunction { new LoggedFunction6[A1, A2, A3, A4, A5, A6, B](body) def apply[A1, A2, A3, A4, A5, A6, A7, B](body: (A1, A2, A3, A4, A5, A6, A7) => B) = new LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body) + def apply[A1, A2, A3, A4, A5, A6, A7, A8, B](body: (A1, A2, A3, A4, A5, A6, A7, A8) => B) = + new LoggedFunction8[A1, A2, A3, A4, A5, A6, A7, A8, B](body) } object SynchronizedLoggedFunction { diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala index baacd95c815..feebd2f05bb 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala @@ -140,7 +140,91 @@ class DockerContainerFactoryTests userImagesRegistryConfig, DockerContainerFactoryConfig(true))(actorSystem, executionContext, logging, dockerApiStub, mock[RuncApi]) - val cf = factory.createContainer(tid, "testContainer", image, false, 10.MB, 32) + val cf = factory.createContainer(tid, "testContainer", image, false, 10.MB, 32, None) + + val c = Await.result(cf, 5000.milliseconds) + + Await.result(c.destroy(), 500.milliseconds) + + } + + it should "set the docker run args with cpu limit when provided" in { + + val image = ExecManifest.runtimesManifest.manifests("nodejs:20").image + + implicit val tid = TransactionId.testing + val dockerApiStub = mock[DockerApiWithFileAccess] + //setup run expectation + (dockerApiStub + .run(_: String, _: Seq[String])(_: TransactionId)) + .expects( + image.resolveImageName(Some(runtimesRegistryConfig.url)), + List( + "--cpu-shares", + "32", //should be calculated as 1024/(numcore * sharefactor) via ContainerFactory.cpuShare + "--memory", + "10m", + "--memory-swap", + "10m", + "--network", + "net1", + "-e", + "__OW_API_HOST=", + "-e", + "k1=v1", + "-e", + "k2=v2", + "-e", + "k3=", + "--dns", + "dns1", + "--dns", + "dns2", + "--name", + "testContainer", + "--cpus", + "0.5", + "--extra1", + "e1", + "--extra1", + "e2", + "--extra2", + "e3", + "--extra2", + "e4"), + *) + .returning(Future.successful { ContainerId("fakecontainerid") }) + //setup inspect expectation + (dockerApiStub + .inspectIPAddress(_: ContainerId, _: String)(_: TransactionId)) + .expects(ContainerId("fakecontainerid"), "net1", *) + .returning(Future.successful { ContainerAddress("1.2.3.4", 1234) }) + //setup rm expectation + (dockerApiStub + .rm(_: ContainerId)(_: TransactionId)) + .expects(ContainerId("fakecontainerid"), *) + .returning(Future.successful(())) + //setup clientVersion exceptation + (dockerApiStub.clientVersion _) + .expects() + .returning("mock_test_client") + + val factory = + new DockerContainerFactory( + InvokerInstanceId(0, userMemory = defaultUserMemory), + Map.empty, + ContainerArgsConfig( + "net1", + Seq("dns1", "dns2"), + Seq.empty, + Seq.empty, + Seq("k1=v1", "k2=v2", "k3"), + Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))), + runtimesRegistryConfig, + userImagesRegistryConfig, + DockerContainerFactoryConfig(true))(actorSystem, executionContext, logging, dockerApiStub, mock[RuncApi]) + + val cf = factory.createContainer(tid, "testContainer", image, false, 10.MB, 32, Some(0.5)) val c = Await.result(cf, 5000.milliseconds) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala index dbfb8588382..25a63da55ee 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala @@ -137,6 +137,7 @@ class DockerContainerTests val image = "image" val memory = 128.MB val cpuShares = 1 + val cpuLimit = 0.5 val environment = Map("test" -> "hi") val network = "testwork" val name = "myContainer" @@ -145,6 +146,7 @@ class DockerContainerTests image = Right(ImageName(image)), memory = memory, cpuShares = cpuShares, + cpuLimit = Some(cpuLimit), environment = environment, network = network, name = Some(name), @@ -171,6 +173,7 @@ class DockerContainerTests args should contain inOrder ("--cpu-shares", cpuShares.toString) args should contain inOrder ("--network", network) args should contain inOrder ("--name", name) + args should contain inOrder ("--cpus", cpuLimit.toString) // Assert proper environment passing args should contain allOf ("-e", "test=hi") diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolConfigTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolConfigTests.scala new file mode 100644 index 00000000000..23e3a738e9f --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolConfigTests.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.containerpool.test + +import org.apache.openwhisk.core.containerpool.ContainerPoolConfig +import org.apache.openwhisk.core.entity.ByteSize +import org.apache.openwhisk.core.entity.size.SizeInt +import org.junit.runner.RunWith +import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.junit.JUnitRunner + +import scala.concurrent.duration.DurationInt + +@RunWith(classOf[JUnitRunner]) +class ContainerPoolConfigTests extends FlatSpec with Matchers { + + def createPoolConfig(userMemory: ByteSize, userCpus: Option[Double] = None): ContainerPoolConfig = { + ContainerPoolConfig(userMemory, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second, 10, userCpus) + } + + it should "calculate container cpu shares" in { + val (userMemory, memoryLimit) = (2.GB, 256.MB) + val poolConfig = createPoolConfig(userMemory) + poolConfig.cpuShare(memoryLimit) shouldBe 128 + } + + it should "use min cpu shares when calculated container cpu shares is too low" in { + val (userMemory, memoryLimit) = (1024.MB, 1.MB) + val poolConfig = createPoolConfig(userMemory) + poolConfig.cpuShare(memoryLimit) shouldBe 2 // calculated shares would be 1, but min is 2 + } + + it should "calculate container cpu limit" in { + val (userMemory, memoryLimit, userCpus) = (2.GB, 256.MB, 2.0) + val poolConfig = createPoolConfig(userMemory, Some(userCpus)) + poolConfig.cpuLimit(memoryLimit) shouldBe Some(0.25) + } + + it should "correctly round container cpu limit" in { + val (userMemory, memoryLimit, userCpus) = (768.MB, 256.MB, 2.0) + val poolConfig = createPoolConfig(userMemory, Some(userCpus)) + poolConfig.cpuLimit(memoryLimit) shouldBe Some(0.66667) // calculated limit is 0.666..., rounded to 0.66667 + } + + it should "use min container cpu limit when calculated limit is too low" in { + val (userMemory, memoryLimit, userCpus) = (1024.MB, 1.MB, 1.0) + val poolConfig = createPoolConfig(userMemory, Some(userCpus)) + poolConfig.cpuLimit(memoryLimit) shouldBe Some(0.01) // calculated limit is 0.001, but min is 0.01 + } + + it should "return None for container cpu limit when userCpus is not set" in { + val (userMemory, memoryLimit) = (2.GB, 256.MB) + val poolConfig = createPoolConfig(userMemory) + poolConfig.cpuLimit(memoryLimit) shouldBe None + } + + it should "require userCpus to be greater than 0" in { + assertThrows[IllegalArgumentException] { + createPoolConfig(2.GB, Some(-1.0)) + } + } +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala index 03a0e1f1e15..e29eb15e56c 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala @@ -238,7 +238,14 @@ class ContainerProxyTests /** Creates an inspectable factory */ def createFactory(response: Future[Container]) = LoggedFunction { - (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: Int, _: Option[ExecutableWhiskAction]) => + (_: TransactionId, + _: String, + _: ImageName, + _: Boolean, + _: ByteSize, + _: Int, + _: Option[Double], + _: Option[ExecutableWhiskAction]) => response } @@ -332,7 +339,7 @@ class ContainerProxyTests preWarm(machine) factory.calls should have size 1 - val (tid, name, _, _, memory, cpuShares, _) = factory.calls(0) + val (tid, name, _, _, memory, cpuShares, _, _) = factory.calls(0) tid shouldBe TransactionId.invokerWarmup name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind""" memory shouldBe memoryLimit diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala index 1aa85ad9e79..714a8f9b1e5 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala @@ -201,6 +201,7 @@ class FunctionPullingContainerPoolTests prewarmPromotion, memorySyncInterval, batchDeletionSize, + Some(2), prewarmContainerCreationConfig) def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId, @@ -460,6 +461,7 @@ class FunctionPullingContainerPoolTests false, FiniteDuration(10, TimeUnit.SECONDS), 10, + Some(2), prewarmContainerCreationConfig) val pool = system.actorOf( diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala index 36e1416fd29..3200aeed95c 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala @@ -215,7 +215,14 @@ class FunctionPullingContainerProxyTests /** Creates an inspectable factory */ def createFactory(response: Future[Container]) = LoggedFunction { - (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: Int, _: Option[ExecutableWhiskAction]) => + (_: TransactionId, + _: String, + _: ImageName, + _: Boolean, + _: ByteSize, + _: Int, + _: Option[Double], + _: Option[ExecutableWhiskAction]) => response } @@ -396,7 +403,7 @@ class FunctionPullingContainerProxyTests preWarm(machine, probe) factory.calls should have size 1 - val (tid, name, _, _, memory, _, _) = factory.calls(0) + val (tid, name, _, _, memory, _, _, _) = factory.calls(0) tid shouldBe TransactionId.invokerWarmup name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind""" memory shouldBe memoryLimit @@ -586,7 +593,7 @@ class FunctionPullingContainerProxyTests case RequestActivation(Some(_), None) => true } - val (tid, name, _, _, memory, _, _) = factory.calls(0) + val (tid, name, _, _, memory, _, _, _) = factory.calls(0) tid shouldBe TransactionId.invokerColdstart name should fullyMatch regex """wskmyname\d+_\d+_actionSpace_actionName""" memory shouldBe memoryLimit diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala index a77ca0d5ba6..56196ed4c7b 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala @@ -176,7 +176,8 @@ class YARNContainerFactoryTests imageToCreate, unuseduserProvidedImage = true, ByteSize(256, SizeUnits.MB), - 1) + 1, + None) Await.result(containerFuture, 60.seconds) @@ -231,7 +232,8 @@ class YARNContainerFactoryTests imageNotToDelete, unuseduserProvidedImage = true, ByteSize(256, SizeUnits.MB), - 1) + 1, + None) val containerFuture2 = factory.createContainer( TransactionId.testing, @@ -239,7 +241,8 @@ class YARNContainerFactoryTests imageToDelete, unuseduserProvidedImage = true, ByteSize(256, SizeUnits.MB), - 1) + 1, + None) val containerFuture3 = factory.createContainer( TransactionId.testing, @@ -247,7 +250,8 @@ class YARNContainerFactoryTests imageToDelete, unuseduserProvidedImage = true, ByteSize(256, SizeUnits.MB), - 1) + 1, + None) val containerFuture4 = factory.createContainer( TransactionId.testing, @@ -255,7 +259,8 @@ class YARNContainerFactoryTests imageToDelete, unuseduserProvidedImage = true, ByteSize(256, SizeUnits.MB), - 1) + 1, + None) val container1 = Await.result(containerFuture1, 30.seconds) val container2 = Await.result(containerFuture2, 30.seconds) @@ -349,7 +354,8 @@ class YARNContainerFactoryTests images(0), unuseduserProvidedImage = true, ByteSize(256, SizeUnits.MB), - 1) + 1, + None) val container2Future = factory.createContainer( TransactionId.testing, @@ -357,7 +363,8 @@ class YARNContainerFactoryTests images(1), unuseduserProvidedImage = true, ByteSize(256, SizeUnits.MB), - 1) + 1, + None) val container3Future = factory.createContainer( TransactionId.testing, @@ -365,7 +372,8 @@ class YARNContainerFactoryTests images(0), unuseduserProvidedImage = true, ByteSize(256, SizeUnits.MB), - 1) + 1, + None) Await.result(container1Future, 30.seconds) val container2 = Await.result(container2Future, 30.seconds) @@ -463,14 +471,16 @@ class YARNContainerFactoryTests imageToCreate, unuseduserProvidedImage = true, ByteSize(256, SizeUnits.MB), - 1) + 1, + None) val containerFuture1 = factory1.createContainer( TransactionId.testing, "name", imageToCreate, unuseduserProvidedImage = true, ByteSize(256, SizeUnits.MB), - 1) + 1, + None) Await.result(containerFuture0, 60.seconds) Await.result(containerFuture1, 60.seconds)