Skip to content

Commit 470eaf5

Browse files
authored
Prewarm eviction variance (#4916)
* allow limiting number of prewarms to expire in one shot * fixing the order of prewarm removal/expiration tracking
1 parent 51aee46 commit 470eaf5

File tree

6 files changed

+182
-64
lines changed

6 files changed

+182
-64
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ case class ContainerArgsConfig(network: String,
4747
case class ContainerPoolConfig(userMemory: ByteSize,
4848
concurrentPeekFactor: Double,
4949
akkaClient: Boolean,
50-
prewarmExpirationCheckInterval: FiniteDuration) {
50+
prewarmExpirationCheckInterval: FiniteDuration,
51+
prewarmExpirationCheckIntervalVariance: Option[FiniteDuration],
52+
prewarmExpirationLimit: Int) {
5153
require(
5254
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
5355
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")

core/invoker/src/main/resources/application.conf

+3-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ whisk {
6060
user-memory: 1024 m
6161
concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
6262
akka-client: false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
63-
prewarm-expiration-check-interval: 1 minute
63+
prewarm-expiration-check-interval: 1 minute # period to check for prewarm expiration
64+
prewarm-expiration-check-interval-variance: 10 seconds # varies expiration across invokers to avoid many concurrent expirations
65+
prewarm-expiration-limit: 100 # number of prewarms to expire in one expiration cycle (remaining expired will be considered for expiration in next cycle)
6466
}
6567

6668
kubernetes {

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

+74-43
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.openwhisk.core.entity.size._
2727
import scala.annotation.tailrec
2828
import scala.collection.immutable
2929
import scala.concurrent.duration._
30-
import scala.util.Try
30+
import scala.util.{Random, Try}
3131

3232
sealed trait WorkerState
3333
case object Busy extends WorkerState
@@ -90,11 +90,14 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
9090
adjustPrewarmedContainer(true, false)
9191

9292
// check periodically, adjust prewarmed container(delete if unused for some time and create some increment containers)
93-
context.system.scheduler.schedule(
94-
2.seconds,
95-
poolConfig.prewarmExpirationCheckInterval,
96-
self,
97-
AdjustPrewarmedContainer)
93+
// add some random amount to this schedule to avoid a herd of container removal + creation
94+
val interval = poolConfig.prewarmExpirationCheckInterval + poolConfig.prewarmExpirationCheckIntervalVariance
95+
.map(v =>
96+
Random
97+
.nextInt(v.toSeconds.toInt))
98+
.getOrElse(0)
99+
.seconds
100+
context.system.scheduler.schedule(2.seconds, interval, self, AdjustPrewarmedContainer)
98101

99102
def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
100103
val namespaceName = r.msg.user.namespace.name
@@ -330,7 +333,20 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
330333

331334
/** adjust prewarm containers up to the configured requirements for each kind/memory combination. */
332335
def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
333-
//fill in missing prewarms
336+
if (scheduled) {
337+
//on scheduled time, remove expired prewarms
338+
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p =>
339+
prewarmedPool = prewarmedPool - p
340+
p ! Remove
341+
}
342+
//on scheduled time, emit cold start counter metric with memory + kind
343+
coldStartCount foreach { coldStart =>
344+
val coldStartKey = coldStart._1
345+
MetricEmitter.emitCounterMetric(
346+
LoggingMarkers.CONTAINER_POOL_PREWARM_COLDSTART(coldStartKey.memory.toString, coldStartKey.kind))
347+
}
348+
}
349+
//fill in missing prewarms (replaces any deletes)
334350
ContainerPool
335351
.increasePrewarms(init, scheduled, coldStartCount, prewarmConfig, prewarmedPool, prewarmStartingPool)
336352
.foreach { c =>
@@ -344,15 +360,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
344360
}
345361
}
346362
if (scheduled) {
347-
//on scheduled time, remove expired prewarms
348-
ContainerPool.removeExpired(prewarmConfig, prewarmedPool).foreach(_ ! Remove)
349-
//on scheduled time, emit cold start counter metric with memory + kind
350-
coldStartCount foreach { coldStart =>
351-
val coldStartKey = coldStart._1
352-
MetricEmitter.emitCounterMetric(
353-
LoggingMarkers.CONTAINER_POOL_PREWARM_COLDSTART(coldStartKey.memory.toString, coldStartKey.kind))
354-
}
355-
// then clear coldStartCounts each time scheduled event is processed to reset counts
363+
// lastly, clear coldStartCounts each time scheduled event is processed to reset counts
356364
coldStartCount = immutable.Map.empty[ColdStartKey, Int]
357365
}
358366
}
@@ -452,10 +460,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
452460
MetricEmitter.emitGaugeMetric(
453461
LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE,
454462
containersInUse.map(_._2.memoryLimit.toMB).sum)
455-
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT, prewarmedPool.size)
463+
MetricEmitter.emitGaugeMetric(
464+
LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT,
465+
prewarmedPool.size + prewarmStartingPool.size)
456466
MetricEmitter.emitGaugeMetric(
457467
LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
458-
prewarmedPool.map(_._2.memoryLimit.toMB).sum)
468+
prewarmedPool.map(_._2.memoryLimit.toMB).sum + prewarmStartingPool.map(_._2._2.toMB).sum)
459469
val unused = freePool.filter(_._2.activeActivationCount == 0)
460470
val unusedMB = unused.map(_._2.memoryLimit.toMB).sum
461471
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_IDLES_COUNT, unused.size)
@@ -555,34 +565,53 @@ object ContainerPool {
555565
/**
556566
* Find the expired actor in prewarmedPool
557567
*
568+
* @param poolConfig
558569
* @param prewarmConfig
559570
* @param prewarmedPool
560571
* @param logging
561572
* @return a list of expired actor
562573
*/
563-
def removeExpired(prewarmConfig: List[PrewarmingConfig], prewarmedPool: Map[ActorRef, PreWarmedData])(
564-
implicit logging: Logging): List[ActorRef] = {
565-
prewarmConfig.flatMap { config =>
566-
val kind = config.exec.kind
567-
val memory = config.memoryLimit
568-
config.reactive
569-
.map { _ =>
570-
val expiredPrewarmedContainer = prewarmedPool
571-
.filter { warmInfo =>
572-
warmInfo match {
573-
case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if p.isExpired() => true
574-
case _ => false
574+
def removeExpired[A](poolConfig: ContainerPoolConfig,
575+
prewarmConfig: List[PrewarmingConfig],
576+
prewarmedPool: Map[A, PreWarmedData])(implicit logging: Logging): List[A] = {
577+
val now = Deadline.now
578+
val expireds = prewarmConfig
579+
.flatMap { config =>
580+
val kind = config.exec.kind
581+
val memory = config.memoryLimit
582+
config.reactive
583+
.map { c =>
584+
val expiredPrewarmedContainer = prewarmedPool.toSeq
585+
.filter { warmInfo =>
586+
warmInfo match {
587+
case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if p.isExpired() => true
588+
case _ => false
589+
}
575590
}
591+
.sortBy(_._2.expires.getOrElse(now))
592+
593+
// emit expired container counter metric with memory + kind
594+
MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
595+
if (expiredPrewarmedContainer.nonEmpty) {
596+
logging.info(
597+
this,
598+
s"[kind: ${kind} memory: ${memory.toString}] ${expiredPrewarmedContainer.size} expired prewarmed containers")
576599
}
577-
// emit expired container counter metric with memory + kind
578-
MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
579-
logging.info(
580-
this,
581-
s"[kind: ${kind} memory: ${memory.toString}] removed ${expiredPrewarmedContainer.size} expired prewarmed container")
582-
expiredPrewarmedContainer.keys
600+
expiredPrewarmedContainer.map(e => (e._1, e._2.expires.getOrElse(now)))
601+
}
602+
.getOrElse(List.empty)
603+
}
604+
.sortBy(_._2) //need to sort these so that if the results are limited, we take the oldest
605+
.map(_._1)
606+
if (expireds.nonEmpty) {
607+
logging.info(this, s"removing up to ${poolConfig.prewarmExpirationLimit} of ${expireds.size} expired containers")
608+
expireds.take(poolConfig.prewarmExpirationLimit).foreach { e =>
609+
prewarmedPool.get(e).map { d =>
610+
logging.info(this, s"removing expired prewarm of kind ${d.kind} with container ${d.container} ")
583611
}
584-
.getOrElse(List.empty)
612+
}
585613
}
614+
expireds.take(poolConfig.prewarmExpirationLimit)
586615
}
587616

588617
/**
@@ -609,8 +638,8 @@ object ContainerPool {
609638
val memory = config.memoryLimit
610639

611640
val runningCount = prewarmedPool.count {
612-
// done starting, and not expired
613-
case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if !p.isExpired() => true
641+
// done starting (include expired, since they may not have been removed yet)
642+
case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) => true
614643
// started but not finished starting (or expired)
615644
case _ => false
616645
}
@@ -632,10 +661,12 @@ object ContainerPool {
632661
}
633662
}
634663

635-
logging.info(
636-
this,
637-
s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${desiredCount - currentCount} pre-warms to desired count: ${desiredCount} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
638-
TransactionId.invokerWarmup)
664+
if (currentCount < desiredCount) {
665+
logging.info(
666+
this,
667+
s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${desiredCount - currentCount} pre-warms to desired count: ${desiredCount} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
668+
TransactionId.invokerWarmup)
669+
}
639670
(config, (currentCount, desiredCount))
640671
}.toMap
641672
}

tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala

+1-3
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.openwhisk.core.containerpool.mesos.test
1919

20-
import java.util.concurrent.TimeUnit
21-
2220
import akka.actor.ActorSystem
2321
import akka.stream.ActorMaterializer
2422
import akka.stream.scaladsl.Sink
@@ -86,7 +84,7 @@ class MesosContainerFactoryTest
8684
}
8785

8886
// 80 slots, each 265MB
89-
val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, FiniteDuration(1, TimeUnit.MINUTES))
87+
val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, 1.minute, None, 100)
9088
val actionMemory = 265.MB
9189
val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0
9290

0 commit comments

Comments
 (0)