Skip to content

Commit 3a75379

Browse files
authored
Merge 8e0cf4e into 6375c96
2 parents 6375c96 + 8e0cf4e commit 3a75379

File tree

20 files changed

+290
-44
lines changed

20 files changed

+290
-44
lines changed

ansible/group_vars/all

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ invoker:
209209
heap: "{{ invoker_heap | default('2g') }}"
210210
arguments: "{{ invoker_arguments | default('') }}"
211211
userMemory: "{{ invoker_user_memory | default('2048m') }}"
212+
userCpus: "{{ invoker_user_cpus | default() }}"
212213
# Specify if it is allowed to deploy more than 1 invoker on a single machine.
213214
allowMultipleInstances: "{{ invoker_allow_multiple_instances | default(false) }}"
214215
# Specify if it should use docker-runc or docker to pause/unpause containers

ansible/roles/invoker/tasks/deploy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@
258258
"CONFIG_whisk_containerFactory_containerArgs_network": "{{ invoker_container_network_name | default('bridge') }}"
259259
"INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}"
260260
"CONFIG_whisk_containerPool_userMemory": "{{ hostvars[groups['invokers'][invoker_index | int]].user_memory | default(invoker.userMemory) }}"
261+
"CONFIG_whisk_containerPool_userCpus": "{{ invoker.userCpus | default() }}"
261262
"CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}"
262263
"CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc | default(false) | lower }}"
263264
"WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.openwhisk.spi.Spi
2525

2626
import scala.concurrent.Future
2727
import scala.concurrent.duration.FiniteDuration
28-
import scala.math.max
28+
import scala.math.{max, round}
2929

3030
case class ContainerArgsConfig(network: String,
3131
dnsServers: Seq[String] = Seq.empty,
@@ -55,13 +55,15 @@ case class ContainerPoolConfig(userMemory: ByteSize,
5555
prewarmPromotion: Boolean,
5656
memorySyncInterval: FiniteDuration,
5757
batchDeletionSize: Int,
58+
userCpus: Option[Double] = None,
5859
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) {
5960
require(
6061
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
6162
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
6263

6364
require(prewarmExpirationCheckInterval.toSeconds > 0, "prewarmExpirationCheckInterval must be > 0")
6465
require(batchDeletionSize > 0, "batch deletion size must be > 0")
66+
require(userCpus.forall(_ > 0), "userCpus must be > 0")
6567

6668
/**
6769
* The shareFactor indicates the number of containers that would share a single core, on average.
@@ -73,6 +75,16 @@ case class ContainerPoolConfig(userMemory: ByteSize,
7375
// Grant more CPU to a container if it allocates more memory.
7476
def cpuShare(reservedMemory: ByteSize) =
7577
max((totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt, 2) // The minimum allowed cpu-shares is 2
78+
79+
private val minContainerCpus = 0.01 // The minimum cpus allowed by docker is 0.01
80+
private val roundingMultiplier = 100000
81+
def cpuLimit(reservedMemory: ByteSize): Option[Double] = {
82+
userCpus.map(c => {
83+
val containerCpus = c / (userMemory.toBytes / reservedMemory.toBytes)
84+
val roundedContainerCpus = round(containerCpus * roundingMultiplier).toDouble / roundingMultiplier // Only use decimal precision of 5
85+
max(roundedContainerCpus, minContainerCpus)
86+
})
87+
}
7688
}
7789

7890
case class PrewarmContainerCreationConfig(maxConcurrent: Int, creationDelay: FiniteDuration) {
@@ -116,16 +128,18 @@ trait ContainerFactory {
116128
userProvidedImage: Boolean,
117129
memory: ByteSize,
118130
cpuShares: Int,
131+
cpuLimit: Option[Double],
119132
action: Option[ExecutableWhiskAction])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
120-
createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares)
133+
createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares, cpuLimit)
121134
}
122135

123136
def createContainer(tid: TransactionId,
124137
name: String,
125138
actionImage: ExecManifest.ImageName,
126139
userProvidedImage: Boolean,
127140
memory: ByteSize,
128-
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container]
141+
cpuShares: Int,
142+
cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container]
129143

130144
/** perform any initialization */
131145
def init(): Unit

common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ class YARNContainerFactory(actorSystem: ActorSystem,
123123
actionImage: ExecManifest.ImageName,
124124
unuseduserProvidedImage: Boolean,
125125
unusedmemory: ByteSize,
126-
unusedcpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
126+
unusedcpuShares: Int,
127+
unusedcpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
127128
implicit val timeout: Timeout = Timeout(containerStartTimeoutMS.milliseconds)
128129

129130
//First send the create command to YARN, then with a different actor, wait for the container to be ready

core/invoker/src/main/resources/application.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ whisk {
7171
prewarm-promotion: false # if true, action can take prewarm container which has bigger memory
7272
memory-sync-interval: 1 second # period to sync memory info to etcd
7373
batch-deletion-size: 10 # batch size for removing containers when disable invoker, too big value may cause docker/k8s overload
74+
# optional setting to specify the total allocatable cpus for all action containers, each container will get a fraction of this proportional to its allocated memory to limit the cpu
75+
# user-cpus: 1
7476
}
7577

7678
kubernetes {

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ class ContainerProxy(factory: (TransactionId,
250250
Boolean,
251251
ByteSize,
252252
Int,
253+
Option[Double],
253254
Option[ExecutableWhiskAction]) => Future[Container],
254255
sendActiveAck: ActiveAck,
255256
storeActivation: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],
@@ -288,6 +289,7 @@ class ContainerProxy(factory: (TransactionId,
288289
job.exec.pull,
289290
job.memoryLimit,
290291
poolConfig.cpuShare(job.memoryLimit),
292+
poolConfig.cpuLimit(job.memoryLimit),
291293
None)
292294
.map(container =>
293295
PreWarmCompleted(PreWarmedData(container, job.exec.kind, job.memoryLimit, expires = job.ttl.map(_.fromNow))))
@@ -307,6 +309,7 @@ class ContainerProxy(factory: (TransactionId,
307309
job.action.exec.pull,
308310
job.action.limits.memory.megabytes.MB,
309311
poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
312+
poolConfig.cpuLimit(job.action.limits.memory.megabytes.MB),
310313
Some(job.action))
311314

312315
// container factory will either yield a new container ready to execute the action, or
@@ -978,6 +981,7 @@ object ContainerProxy {
978981
Boolean,
979982
ByteSize,
980983
Int,
984+
Option[Double],
981985
Option[ExecutableWhiskAction]) => Future[Container],
982986
ack: ActiveAck,
983987
store: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ object DockerContainer {
6363
registryConfig: Option[RuntimesRegistryConfig] = None,
6464
memory: ByteSize = 256.MB,
6565
cpuShares: Int = 0,
66+
cpuLimit: Option[Double] = None,
6667
environment: Map[String, String] = Map.empty,
6768
network: String = "bridge",
6869
dnsServers: Seq[String] = Seq.empty,
@@ -101,6 +102,7 @@ object DockerContainer {
101102
dnsSearch.flatMap(d => Seq("--dns-search", d)) ++
102103
dnsOptions.flatMap(d => Seq(dnsOptString, d)) ++
103104
name.map(n => Seq("--name", n)).getOrElse(Seq.empty) ++
105+
cpuLimit.map(c => Seq("--cpus", c.toString)).getOrElse(Seq.empty) ++
104106
params
105107

106108
val registryConfigUrl = registryConfig.map(_.url).getOrElse("")

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,14 @@ class DockerContainerFactory(instance: InvokerInstanceId,
5757
extends ContainerFactory {
5858

5959
/** Create a container using docker cli */
60-
override def createContainer(tid: TransactionId,
61-
name: String,
62-
actionImage: ExecManifest.ImageName,
63-
userProvidedImage: Boolean,
64-
memory: ByteSize,
65-
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
60+
override def createContainer(
61+
tid: TransactionId,
62+
name: String,
63+
actionImage: ExecManifest.ImageName,
64+
userProvidedImage: Boolean,
65+
memory: ByteSize,
66+
cpuShares: Int,
67+
cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
6668
val registryConfig =
6769
ContainerFactory.resolveRegistryConfig(userProvidedImage, runtimesRegistryConfig, userImagesRegistryConfig)
6870
val image = if (userProvidedImage) Left(actionImage) else Right(actionImage)
@@ -72,6 +74,7 @@ class DockerContainerFactory(instance: InvokerInstanceId,
7274
registryConfig = Some(registryConfig),
7375
memory = memory,
7476
cpuShares = cpuShares,
77+
cpuLimit = cpuLimit,
7578
environment = Map("__OW_API_HOST" -> config.wskApiHost) ++ containerArgsConfig.extraEnvVarMap,
7679
network = containerArgsConfig.network,
7780
dnsServers = containerArgsConfig.dnsServers,

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@ class StandaloneDockerContainerFactory(instance: InvokerInstanceId, parameters:
6161
private val pulledImages = new TrieMap[String, Boolean]()
6262
private val factoryConfig = loadConfigOrThrow[StandaloneDockerConfig](ConfigKeys.standaloneDockerContainerFactory)
6363

64-
override def createContainer(tid: TransactionId,
65-
name: String,
66-
actionImage: ExecManifest.ImageName,
67-
userProvidedImage: Boolean,
68-
memory: ByteSize,
69-
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
64+
override def createContainer(
65+
tid: TransactionId,
66+
name: String,
67+
actionImage: ExecManifest.ImageName,
68+
userProvidedImage: Boolean,
69+
memory: ByteSize,
70+
cpuShares: Int,
71+
cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
7072

7173
//For standalone server usage we would also want to pull the OpenWhisk provided image so as to ensure if
7274
//local setup does not have the image then it pulls it down
@@ -84,7 +86,7 @@ class StandaloneDockerContainerFactory(instance: InvokerInstanceId, parameters:
8486
}
8587
} else Future.successful(true)
8688

87-
pulled.flatMap(_ => super.createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares))
89+
pulled.flatMap(_ => super.createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares, cpuLimit))
8890
}
8991

9092
override def init(): Unit = {

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,14 @@ class KubernetesContainerFactory(
6767
Await.ready(cleaning, KubernetesContainerFactoryProvider.runtimeDeleteTimeout)
6868
}
6969

70-
override def createContainer(tid: TransactionId,
71-
name: String,
72-
actionImage: ImageName,
73-
userProvidedImage: Boolean,
74-
memory: ByteSize,
75-
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
70+
override def createContainer(
71+
tid: TransactionId,
72+
name: String,
73+
actionImage: ImageName,
74+
userProvidedImage: Boolean,
75+
memory: ByteSize,
76+
cpuShares: Int,
77+
cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
7678
val image = actionImage.resolveImageName(Some(
7779
ContainerFactory.resolveRegistryConfig(userProvidedImage, runtimesRegistryConfig, userImagesRegistryConfig).url))
7880

0 commit comments

Comments
 (0)