Skip to content

Add optional cpu limit to spawned action containers #5443

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ invoker:
heap: "{{ invoker_heap | default('2g') }}"
arguments: "{{ invoker_arguments | default('') }}"
userMemory: "{{ invoker_user_memory | default('2048m') }}"
userCpus: "{{ invoker_user_cpus | default() }}"
# Specify if it is allowed to deploy more than 1 invoker on a single machine.
allowMultipleInstances: "{{ invoker_allow_multiple_instances | default(false) }}"
# Specify if it should use docker-runc or docker to pause/unpause containers
Expand Down
1 change: 1 addition & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@
"CONFIG_whisk_containerFactory_containerArgs_network": "{{ invoker_container_network_name | default('bridge') }}"
"INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}"
"CONFIG_whisk_containerPool_userMemory": "{{ hostvars[groups['invokers'][invoker_index | int]].user_memory | default(invoker.userMemory) }}"
"CONFIG_whisk_containerPool_userCpus": "{{ invoker.userCpus | default() }}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could deploy this branch w/ and w/o this configuration using ansible.
I confirmed that NanoCpus is properly configured in containers according to this config.

...
            "CpuShares": 25,
            "Memory": 268435456,
            "NanoCpus": 0,
...
...
            "CpuShares": 25,
            "Memory": 268435456,
            "NanoCpus": 150000000,
...

"CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}"
"CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc | default(false) | lower }}"
"WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.openwhisk.spi.Spi

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.math.max
import scala.math.{max, round}

case class ContainerArgsConfig(network: String,
dnsServers: Seq[String] = Seq.empty,
Expand Down Expand Up @@ -55,13 +55,15 @@ case class ContainerPoolConfig(userMemory: ByteSize,
prewarmPromotion: Boolean,
memorySyncInterval: FiniteDuration,
batchDeletionSize: Int,
userCpus: Option[Double] = None,
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")

require(prewarmExpirationCheckInterval.toSeconds > 0, "prewarmExpirationCheckInterval must be > 0")
require(batchDeletionSize > 0, "batch deletion size must be > 0")
require(userCpus.forall(_ > 0), "userCpus must be > 0")

/**
* The shareFactor indicates the number of containers that would share a single core, on average.
Expand All @@ -73,6 +75,16 @@ case class ContainerPoolConfig(userMemory: ByteSize,
// Grant more CPU to a container if it allocates more memory.
def cpuShare(reservedMemory: ByteSize) =
max((totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt, 2) // The minimum allowed cpu-shares is 2

private val minContainerCpus = 0.01 // The minimum cpus allowed by docker is 0.01
private val roundingMultiplier = 100000
def cpuLimit(reservedMemory: ByteSize): Option[Double] = {
userCpus.map(c => {
val containerCpus = c / (userMemory.toBytes / reservedMemory.toBytes)
val roundedContainerCpus = round(containerCpus * roundingMultiplier).toDouble / roundingMultiplier // Only use decimal precision of 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

max(roundedContainerCpus, minContainerCpus)
})
}
}

case class PrewarmContainerCreationConfig(maxConcurrent: Int, creationDelay: FiniteDuration) {
Expand Down Expand Up @@ -116,16 +128,18 @@ trait ContainerFactory {
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int,
cpuLimit: Option[Double],
action: Option[ExecutableWhiskAction])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares)
createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares, cpuLimit)
}

def createContainer(tid: TransactionId,
name: String,
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container]
cpuShares: Int,
cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container]

/** perform any initialization */
def init(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ class YARNContainerFactory(actorSystem: ActorSystem,
actionImage: ExecManifest.ImageName,
unuseduserProvidedImage: Boolean,
unusedmemory: ByteSize,
unusedcpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
unusedcpuShares: Int,
unusedcpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
implicit val timeout: Timeout = Timeout(containerStartTimeoutMS.milliseconds)

//First send the create command to YARN, then with a different actor, wait for the container to be ready
Expand Down
2 changes: 2 additions & 0 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ whisk {
prewarm-promotion: false # if true, action can take prewarm container which has bigger memory
memory-sync-interval: 1 second # period to sync memory info to etcd
batch-deletion-size: 10 # batch size for removing containers when disable invoker, too big value may cause docker/k8s overload
# optional setting to specify the total allocatable cpus for all action containers, each container will get a fraction of this proportional to its allocated memory to limit the cpu
# user-cpus: 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, could you add a configuration like this?
https://github.com/apache/openwhisk/blob/master/ansible/roles/invoker/tasks/deploy.yml#L304

It would be easier to configure it with ansible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@style95 I just pushed some changes to ansible/roles/invoker/tasks/deploy.yml and ansible/group_vars/all. I'm not super familiar with ansible and was trying to keep it optional. Can you let me know how that looks?

}

kubernetes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ class ContainerProxy(factory: (TransactionId,
Boolean,
ByteSize,
Int,
Option[Double],
Option[ExecutableWhiskAction]) => Future[Container],
sendActiveAck: ActiveAck,
storeActivation: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],
Expand Down Expand Up @@ -288,6 +289,7 @@ class ContainerProxy(factory: (TransactionId,
job.exec.pull,
job.memoryLimit,
poolConfig.cpuShare(job.memoryLimit),
poolConfig.cpuLimit(job.memoryLimit),
None)
.map(container =>
PreWarmCompleted(PreWarmedData(container, job.exec.kind, job.memoryLimit, expires = job.ttl.map(_.fromNow))))
Expand All @@ -307,6 +309,7 @@ class ContainerProxy(factory: (TransactionId,
job.action.exec.pull,
job.action.limits.memory.megabytes.MB,
poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
poolConfig.cpuLimit(job.action.limits.memory.megabytes.MB),
Some(job.action))

// container factory will either yield a new container ready to execute the action, or
Expand Down Expand Up @@ -978,6 +981,7 @@ object ContainerProxy {
Boolean,
ByteSize,
Int,
Option[Double],
Option[ExecutableWhiskAction]) => Future[Container],
ack: ActiveAck,
store: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ object DockerContainer {
registryConfig: Option[RuntimesRegistryConfig] = None,
memory: ByteSize = 256.MB,
cpuShares: Int = 0,
cpuLimit: Option[Double] = None,
environment: Map[String, String] = Map.empty,
network: String = "bridge",
dnsServers: Seq[String] = Seq.empty,
Expand Down Expand Up @@ -101,6 +102,7 @@ object DockerContainer {
dnsSearch.flatMap(d => Seq("--dns-search", d)) ++
dnsOptions.flatMap(d => Seq(dnsOptString, d)) ++
name.map(n => Seq("--name", n)).getOrElse(Seq.empty) ++
cpuLimit.map(c => Seq("--cpus", c.toString)).getOrElse(Seq.empty) ++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wasn't aware that you could do both --cpus and --cpu-shares on the docker run. how does this look in practice with how it behaves setting both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--cpu-shares still provides the weight/priority to cpu cycles for the container, --cpus just provides the cap.

params

val registryConfigUrl = registryConfig.map(_.url).getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ class DockerContainerFactory(instance: InvokerInstanceId,
extends ContainerFactory {

/** Create a container using docker cli */
override def createContainer(tid: TransactionId,
name: String,
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
override def createContainer(
tid: TransactionId,
name: String,
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int,
cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
val registryConfig =
ContainerFactory.resolveRegistryConfig(userProvidedImage, runtimesRegistryConfig, userImagesRegistryConfig)
val image = if (userProvidedImage) Left(actionImage) else Right(actionImage)
Expand All @@ -72,6 +74,7 @@ class DockerContainerFactory(instance: InvokerInstanceId,
registryConfig = Some(registryConfig),
memory = memory,
cpuShares = cpuShares,
cpuLimit = cpuLimit,
environment = Map("__OW_API_HOST" -> config.wskApiHost) ++ containerArgsConfig.extraEnvVarMap,
network = containerArgsConfig.network,
dnsServers = containerArgsConfig.dnsServers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ class StandaloneDockerContainerFactory(instance: InvokerInstanceId, parameters:
private val pulledImages = new TrieMap[String, Boolean]()
private val factoryConfig = loadConfigOrThrow[StandaloneDockerConfig](ConfigKeys.standaloneDockerContainerFactory)

override def createContainer(tid: TransactionId,
name: String,
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
override def createContainer(
tid: TransactionId,
name: String,
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int,
cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {

//For standalone server usage we would also want to pull the OpenWhisk provided image so as to ensure if
//local setup does not have the image then it pulls it down
Expand All @@ -84,7 +86,7 @@ class StandaloneDockerContainerFactory(instance: InvokerInstanceId, parameters:
}
} else Future.successful(true)

pulled.flatMap(_ => super.createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares))
pulled.flatMap(_ => super.createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares, cpuLimit))
}

override def init(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ class KubernetesContainerFactory(
Await.ready(cleaning, KubernetesContainerFactoryProvider.runtimeDeleteTimeout)
}

override def createContainer(tid: TransactionId,
name: String,
actionImage: ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
override def createContainer(
tid: TransactionId,
name: String,
actionImage: ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int,
cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
val image = actionImage.resolveImageName(Some(
ContainerFactory.resolveRegistryConfig(userProvidedImage, runtimesRegistryConfig, userImagesRegistryConfig).url))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class FunctionPullingContainerProxy(
Boolean,
ByteSize,
Int,
Option[Double],
Option[ExecutableWhiskAction]) => Future[Container],
entityStore: ArtifactStore[WhiskEntity],
namespaceBlacklist: NamespaceBlacklist,
Expand Down Expand Up @@ -240,6 +241,7 @@ class FunctionPullingContainerProxy(
job.exec.pull,
job.memoryLimit,
poolConfig.cpuShare(job.memoryLimit),
poolConfig.cpuLimit(job.memoryLimit),
None)
.map(container => PreWarmData(container, job.exec.kind, job.memoryLimit, expires = job.ttl.map(_.fromNow)))
.pipeTo(self)
Expand All @@ -254,6 +256,7 @@ class FunctionPullingContainerProxy(
job.action.exec.pull,
job.action.limits.memory.megabytes.MB,
poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
poolConfig.cpuLimit(job.action.limits.memory.megabytes.MB),
None)
.andThen {
case Failure(t) =>
Expand Down Expand Up @@ -1274,6 +1277,7 @@ object FunctionPullingContainerProxy {
Boolean,
ByteSize,
Int,
Option[Double],
Option[ExecutableWhiskAction]) => Future[Container],
entityStore: ArtifactStore[WhiskEntity],
namespaceBlacklist: NamespaceBlacklist,
Expand Down
18 changes: 13 additions & 5 deletions tests/dat/actions/zippedaction/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions tests/src/test/scala/common/LoggedFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ class LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body: (A1, A2, A3, A4, A5,
}
}

class LoggedFunction8[A1, A2, A3, A4, A5, A6, A7, A8, B](body: (A1, A2, A3, A4, A5, A6, A7, A8) => B)
extends Function8[A1, A2, A3, A4, A5, A6, A7, A8, B] {
val calls = mutable.Buffer[(A1, A2, A3, A4, A5, A6, A7, A8)]()

override def apply(v1: A1, v2: A2, v3: A3, v4: A4, v5: A5, v6: A6, v7: A7, v8: A8): B = {
calls += ((v1, v2, v3, v4, v5, v6, v7, v8))
body(v1, v2, v3, v4, v5, v6, v7, v8)
}
}

class SynchronizedLoggedFunction1[A1, B](body: A1 => B) extends Function1[A1, B] {
val calls = mutable.Buffer[A1]()

Expand Down Expand Up @@ -157,6 +167,8 @@ object LoggedFunction {
new LoggedFunction6[A1, A2, A3, A4, A5, A6, B](body)
def apply[A1, A2, A3, A4, A5, A6, A7, B](body: (A1, A2, A3, A4, A5, A6, A7) => B) =
new LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body)
def apply[A1, A2, A3, A4, A5, A6, A7, A8, B](body: (A1, A2, A3, A4, A5, A6, A7, A8) => B) =
new LoggedFunction8[A1, A2, A3, A4, A5, A6, A7, A8, B](body)
}

object SynchronizedLoggedFunction {
Expand Down
Loading