Skip to content

Commit 3802374

Browse files
authored
[New Scheduler] Implement FunctionPullingContainerPool (#5102)
* Implement FunctionPullingContainerPool * Fix review points
1 parent aa7e6e2 commit 3802374

File tree

16 files changed

+2269
-24
lines changed

16 files changed

+2269
-24
lines changed

ansible/roles/invoker/tasks/deploy.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,11 @@
282282
"CONFIG_whisk_invoker_https_keystorePassword": "{{ invoker.ssl.keystore.password }}"
283283
"CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}"
284284
"CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
285-
"CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
285+
"CONFIG_whisk_containerPool_prewarmExpirationCheckInitDelay": "{{ container_pool_prewarm_expirationCheckInitDelay | default('10 minutes') }}"
286+
"CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('10 minutes') }}"
287+
"CONFIG_whisk_containerPool_prewarmExpirationCheckIntervalVariance": "{{ container_pool_prewarm_expirationCheckIntervalVariance | default('10 seconds') }}"
288+
"CONFIG_whisk_containerPool_prewarmPromotion": "{{ container_pool_strict | default('false') | lower }}"
289+
"CONFIG_whisk_containerPool_prewarmMaxRetryLimit": "{{ container_pool_prewarm_max_retry_limit | default(5) }}"
286290

287291
- name: extend invoker dns env
288292
set_fact:

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,10 @@ object LoggingMarkers {
388388

389389
// Time that is needed to produce message in kafka
390390
val CONTROLLER_KAFKA = LogMarkerToken(controller, kafka, start)(MeasurementUnit.time.milliseconds)
391+
def INVOKER_SHAREDPACKAGE(path: String) =
392+
LogMarkerToken(invoker, "sharedPackage", counter, None, Map("path" -> path))(MeasurementUnit.none)
393+
def INVOKER_CONTAINERPOOL_MEMORY(state: String) =
394+
LogMarkerToken(invoker, "containerPoolMemory", counter, Some(state), Map("state" -> state))(MeasurementUnit.none)
391395

392396
// System overload and random invoker assignment
393397
val MANAGED_SYSTEM_OVERLOAD =

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,3 +484,165 @@ object StatusData extends DefaultJsonProtocol {
484484
implicit val serdes =
485485
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
486486
}
487+
488+
case class ContainerCreationMessage(override val transid: TransactionId,
489+
invocationNamespace: String,
490+
action: FullyQualifiedEntityName,
491+
revision: DocRevision,
492+
whiskActionMetaData: WhiskActionMetaData,
493+
rootSchedulerIndex: SchedulerInstanceId,
494+
schedulerHost: String,
495+
rpcPort: Int,
496+
retryCount: Int = 0,
497+
creationId: CreationId = CreationId.generate())
498+
extends ContainerMessage(transid) {
499+
500+
override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)
501+
override def serialize: String = toJson.compactPrint
502+
}
503+
504+
object ContainerCreationMessage extends DefaultJsonProtocol {
505+
def parse(msg: String): Try[ContainerCreationMessage] = Try(serdes.read(msg.parseJson))
506+
507+
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
508+
private implicit val instanceIdSerdes = SchedulerInstanceId.serdes
509+
private implicit val byteSizeSerdes = size.serdes
510+
implicit val serdes = jsonFormat10(
511+
ContainerCreationMessage.apply(
512+
_: TransactionId,
513+
_: String,
514+
_: FullyQualifiedEntityName,
515+
_: DocRevision,
516+
_: WhiskActionMetaData,
517+
_: SchedulerInstanceId,
518+
_: String,
519+
_: Int,
520+
_: Int,
521+
_: CreationId))
522+
}
523+
524+
case class ContainerDeletionMessage(override val transid: TransactionId,
525+
invocationNamespace: String,
526+
action: FullyQualifiedEntityName,
527+
revision: DocRevision,
528+
whiskActionMetaData: WhiskActionMetaData)
529+
extends ContainerMessage(transid) {
530+
override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)
531+
override def serialize: String = toJson.compactPrint
532+
}
533+
534+
object ContainerDeletionMessage extends DefaultJsonProtocol {
535+
def parse(msg: String): Try[ContainerDeletionMessage] = Try(serdes.read(msg.parseJson))
536+
537+
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
538+
private implicit val instanceIdSerdes = SchedulerInstanceId.serdes
539+
private implicit val byteSizeSerdes = size.serdes
540+
implicit val serdes = jsonFormat5(
541+
ContainerDeletionMessage
542+
.apply(_: TransactionId, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData))
543+
}
544+
545+
abstract class ContainerMessage(private val tid: TransactionId) extends Message {
546+
override val transid: TransactionId = tid
547+
override def serialize: String = ContainerMessage.serdes.write(this).compactPrint
548+
549+
/** Serializes the message to JSON. */
550+
def toJson: JsValue
551+
}
552+
553+
object ContainerMessage extends DefaultJsonProtocol {
554+
def parse(msg: String): Try[ContainerMessage] = Try(serdes.read(msg.parseJson))
555+
556+
implicit val serdes = new RootJsonFormat[ContainerMessage] {
557+
override def write(m: ContainerMessage): JsValue = m.toJson
558+
559+
override def read(json: JsValue): ContainerMessage = {
560+
val JsObject(fields) = json
561+
val creation = fields.contains("creationId")
562+
if (creation) {
563+
json.convertTo[ContainerCreationMessage]
564+
} else {
565+
json.convertTo[ContainerDeletionMessage]
566+
}
567+
}
568+
}
569+
}
570+
571+
sealed trait ContainerCreationError
572+
object ContainerCreationError extends Enumeration {
573+
case object NoAvailableInvokersError extends ContainerCreationError
574+
case object NoAvailableResourceInvokersError extends ContainerCreationError
575+
case object ResourceNotEnoughError extends ContainerCreationError
576+
case object WhiskError extends ContainerCreationError
577+
case object UnknownError extends ContainerCreationError
578+
case object TimeoutError extends ContainerCreationError
579+
case object ShuttingDownError extends ContainerCreationError
580+
case object NonExecutableActionError extends ContainerCreationError
581+
case object DBFetchError extends ContainerCreationError
582+
case object BlackBoxError extends ContainerCreationError
583+
case object ZeroNamespaceLimit extends ContainerCreationError
584+
case object TooManyConcurrentRequests extends ContainerCreationError
585+
586+
val whiskErrors: Set[ContainerCreationError] =
587+
Set(
588+
NoAvailableInvokersError,
589+
NoAvailableResourceInvokersError,
590+
ResourceNotEnoughError,
591+
WhiskError,
592+
ShuttingDownError,
593+
UnknownError,
594+
TimeoutError,
595+
ZeroNamespaceLimit)
596+
597+
def fromName(name: String) = name.toUpperCase match {
598+
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
599+
case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
600+
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
601+
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
602+
case "DBFETCHERROR" => DBFetchError
603+
case "WHISKERROR" => WhiskError
604+
case "BLACKBOXERROR" => BlackBoxError
605+
case "TIMEOUTERROR" => TimeoutError
606+
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
607+
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
608+
case "UNKNOWNERROR" => UnknownError
609+
}
610+
611+
implicit val serds = new RootJsonFormat[ContainerCreationError] {
612+
override def write(error: ContainerCreationError): JsValue = JsString(error.toString)
613+
override def read(json: JsValue): ContainerCreationError =
614+
Try {
615+
val JsString(str) = json
616+
ContainerCreationError.fromName(str.trim.toUpperCase)
617+
} getOrElse {
618+
throw deserializationError("ContainerCreationError must be a valid string")
619+
}
620+
}
621+
}
622+
623+
case class ContainerCreationAckMessage(override val transid: TransactionId,
624+
creationId: CreationId,
625+
invocationNamespace: String,
626+
action: FullyQualifiedEntityName,
627+
revision: DocRevision,
628+
actionMetaData: WhiskActionMetaData,
629+
rootInvokerIndex: InvokerInstanceId,
630+
schedulerHost: String,
631+
rpcPort: Int,
632+
retryCount: Int = 0,
633+
error: Option[ContainerCreationError] = None,
634+
reason: Option[String] = None)
635+
extends Message {
636+
637+
/**
638+
* Serializes message to string. Must be idempotent.
639+
*/
640+
override def serialize: String = ContainerCreationAckMessage.serdes.write(this).compactPrint
641+
}
642+
643+
object ContainerCreationAckMessage extends DefaultJsonProtocol {
644+
def parse(msg: String): Try[ContainerCreationAckMessage] = Try(serdes.read(msg.parseJson))
645+
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
646+
private implicit val byteSizeSerdes = size.serdes
647+
implicit val serdes = jsonFormat12(ContainerCreationAckMessage.apply)
648+
}

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,14 @@ case class ContainerArgsConfig(network: String,
4747
case class ContainerPoolConfig(userMemory: ByteSize,
4848
concurrentPeekFactor: Double,
4949
akkaClient: Boolean,
50+
prewarmExpirationCheckInitDelay: FiniteDuration,
5051
prewarmExpirationCheckInterval: FiniteDuration,
5152
prewarmExpirationCheckIntervalVariance: Option[FiniteDuration],
52-
prewarmExpirationLimit: Int) {
53+
prewarmExpirationLimit: Int,
54+
prewarmMaxRetryLimit: Int,
55+
prewarmPromotion: Boolean,
56+
memorySyncInterval: FiniteDuration,
57+
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) {
5358
require(
5459
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
5560
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
@@ -68,6 +73,11 @@ case class ContainerPoolConfig(userMemory: ByteSize,
6873
max((totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt, 2) // The minimum allowed cpu-shares is 2
6974
}
7075

76+
case class PrewarmContainerCreationConfig(maxConcurrent: Int, creationDelay: FiniteDuration) {
77+
require(maxConcurrent > 0, "maxConcurrent for per invoker must be > 0")
78+
require(creationDelay.toSeconds > 0, "creationDelay must be > 0")
79+
}
80+
7181
case class RuntimesRegistryCredentials(user: String, password: String)
7282

7383
case class RuntimesRegistryConfig(url: String, credentials: Option[RuntimesRegistryCredentials])

common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717

1818
package org.apache.openwhisk.core.entity
1919

20-
import scala.util.Try
20+
import org.apache.commons.lang3.StringUtils
2121

22+
import scala.util.Try
2223
import spray.json.DefaultJsonProtocol
2324
import spray.json.JsNull
2425
import spray.json.JsString
2526
import spray.json.JsValue
2627
import spray.json.RootJsonFormat
2728
import spray.json.deserializationError
2829
import spray.json._
29-
3030
import org.apache.openwhisk.core.entity.ArgNormalizer.trim
3131

3232
/**
@@ -56,11 +56,27 @@ protected[core] class DocId(val id: String) extends AnyVal {
5656
*
5757
* @param rev the document revision, optional
5858
*/
59-
protected[core] class DocRevision private (val rev: String) extends AnyVal {
59+
protected[core] class DocRevision private (val rev: String) extends AnyVal with Ordered[DocRevision] {
6060
def asString = rev // to make explicit that this is a string conversion
6161
def empty = rev == null
6262
override def toString = rev
6363
def serialize = DocRevision.serdes.write(this).compactPrint
64+
65+
override def compare(that: DocRevision): Int = {
66+
if (this.empty && that.empty) {
67+
0
68+
} else if (this.empty) {
69+
-1
70+
} else if (that.empty) {
71+
1
72+
} else {
73+
StringUtils.substringBefore(rev, "-").toInt - StringUtils.substringBefore(that.rev, "-").toInt
74+
}
75+
}
76+
77+
def ==(that: DocRevision): Boolean = {
78+
this.compare(that) == 0
79+
}
6480
}
6581

6682
/**

common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ object size {
163163
implicit val pureconfigReader =
164164
ConfigReader[ConfigValue].map(v => ByteSize(v.atKey("key").getBytes("key"), SizeUnits.BYTE))
165165

166-
protected[entity] implicit val serdes = new RootJsonFormat[ByteSize] {
166+
implicit val serdes = new RootJsonFormat[ByteSize] {
167167
def write(b: ByteSize) = JsString(b.toString)
168168

169169
def read(value: JsValue): ByteSize = value match {

core/invoker/src/main/resources/application.conf

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,13 @@ whisk {
6060
user-memory: 1024 m
6161
concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
6262
akka-client: false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
63-
prewarm-expiration-check-interval: 1 minute # period to check for prewarm expiration
63+
prewarm-expiration-check-init-delay: 10 minute # the init delay time for the first check
64+
prewarm-expiration-check-interval: 10 minute # period to check for prewarm expiration
6465
prewarm-expiration-check-interval-variance: 10 seconds # varies expiration across invokers to avoid many concurrent expirations
6566
prewarm-expiration-limit: 100 # number of prewarms to expire in one expiration cycle (remaining expired will be considered for expiration in next cycle)
67+
prewarm-max-retry-limit: 5 # max subsequent retry limit to create prewarm containers
68+
prewarm-promotion: false # if true, action can take prewarm container which has bigger memory
69+
memory-sync-interval: 1 second # period to sync memory info to etcd
6670
}
6771

6872
kubernetes {

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
9191
.nextInt(v.toSeconds.toInt))
9292
.getOrElse(0)
9393
.seconds
94-
context.system.scheduler.schedule(2.seconds, interval, self, AdjustPrewarmedContainer)
94+
if (prewarmConfig.exists(!_.reactive.isEmpty)) {
95+
context.system.scheduler.schedule(
96+
poolConfig.prewarmExpirationCheckInitDelay,
97+
interval,
98+
self,
99+
AdjustPrewarmedContainer)
100+
}
95101

96102
def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
97103
val namespaceName = r.msg.user.namespace.name.asString
@@ -590,9 +596,9 @@ object ContainerPool {
590596
}
591597
.sortBy(_._2.expires.getOrElse(now))
592598

593-
// emit expired container counter metric with memory + kind
594-
MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
595599
if (expiredPrewarmedContainer.nonEmpty) {
600+
// emit expired container counter metric with memory + kind
601+
MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
596602
logging.info(
597603
this,
598604
s"[kind: ${kind} memory: ${memory.toString}] ${expiredPrewarmedContainer.size} expired prewarmed containers")

0 commit comments

Comments
 (0)