|
17 | 17 |
|
18 | 18 | package org.apache.openwhisk.core.containerpool.v2
|
19 | 19 |
|
| 20 | +import java.util.concurrent.atomic.AtomicInteger |
| 21 | + |
20 | 22 | import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
|
21 | 23 | import org.apache.kafka.clients.producer.RecordMetadata
|
22 | 24 | import org.apache.openwhisk.common._
|
@@ -48,8 +50,8 @@ import scala.concurrent.duration._
|
48 | 50 | import scala.util.{Random, Try}
|
49 | 51 | import scala.collection.immutable.Queue
|
50 | 52 |
|
51 |
| -case class Creation(creationMessage: ContainerCreationMessage, action: WhiskAction) |
52 |
| -case class Deletion(deletionMessage: ContainerDeletionMessage) |
| 53 | +case class CreationContainer(creationMessage: ContainerCreationMessage, action: WhiskAction) |
| 54 | +case class DeletionContainer(deletionMessage: ContainerDeletionMessage) |
53 | 55 | case object Remove
|
54 | 56 | case class Keep(timeout: FiniteDuration)
|
55 | 57 | case class PrewarmContainer(maxConcurrent: Int)
|
@@ -98,7 +100,7 @@ class FunctionPullingContainerPool(
|
98 | 100 |
|
99 | 101 | private var preWarmScheduler: Option[Cancellable] = None
|
100 | 102 | private var prewarmConfigQueue = Queue.empty[(CodeExec[_], ByteSize, Option[FiniteDuration])]
|
101 |
| - private var prewarmCreateFailedCount = immutable.Map.empty[(String, ByteSize), Int] |
| 103 | + private val prewarmCreateFailedCount = new AtomicInteger(0) |
102 | 104 |
|
103 | 105 | val logScheduler = context.system.scheduler.schedule(0.seconds, 1.seconds) {
|
104 | 106 | MetricEmitter.emitHistogramMetric(
|
@@ -166,7 +168,7 @@ class FunctionPullingContainerPool(
|
166 | 168 | }
|
167 | 169 | }
|
168 | 170 |
|
169 |
| - case Creation(create: ContainerCreationMessage, action: WhiskAction) => |
| 171 | + case CreationContainer(create: ContainerCreationMessage, action: WhiskAction) => |
170 | 172 | if (shuttingDown) {
|
171 | 173 | val message =
|
172 | 174 | s"creationId: ${create.creationId}, invoker is shutting down, reschedule ${action.fullyQualifiedName(false)}"
|
@@ -217,7 +219,7 @@ class FunctionPullingContainerPool(
|
217 | 219 | }
|
218 | 220 | }
|
219 | 221 |
|
220 |
| - case Deletion(deletionMessage: ContainerDeletionMessage) => |
| 222 | + case DeletionContainer(deletionMessage: ContainerDeletionMessage) => |
221 | 223 | val oldRevision = deletionMessage.revision
|
222 | 224 | val invocationNamespace = deletionMessage.invocationNamespace
|
223 | 225 | val fqn = deletionMessage.action.copy(version = None)
|
@@ -252,7 +254,10 @@ class FunctionPullingContainerPool(
|
252 | 254 | case ReadyToWork(data) =>
|
253 | 255 | prewarmStartingPool = prewarmStartingPool - sender()
|
254 | 256 | prewarmedPool = prewarmedPool + (sender() -> data)
|
255 |
| - prewarmCreateFailedCount = prewarmCreateFailedCount - ((data.kind, data.memoryLimit)) |
| 257 | + // after create prewarm successfully, reset the value to 0 |
| 258 | + if (prewarmCreateFailedCount.get() > 0) { |
| 259 | + prewarmCreateFailedCount.set(0) |
| 260 | + } |
256 | 261 |
|
257 | 262 | // Container is initialized
|
258 | 263 | case Initialized(data) =>
|
@@ -353,26 +358,13 @@ class FunctionPullingContainerPool(
|
353 | 358 | logging.info(
|
354 | 359 | this,
|
355 | 360 | s"${if (replacePrewarm) "failed" else "expired"} prewarm [kind: ${data.kind}, memory: ${data.memoryLimit.toString}] removed")
|
356 |
| - if (replacePrewarm) { |
357 |
| - prewarmCreateFailedCount.get(data.kind, data.memoryLimit) match { |
358 |
| - case Some(retry) => |
359 |
| - prewarmCreateFailedCount = prewarmCreateFailedCount + ((data.kind, data.memoryLimit) -> (retry + 1)) |
360 |
| - case _ => |
361 |
| - prewarmCreateFailedCount = prewarmCreateFailedCount + ((data.kind, data.memoryLimit) -> 1) |
362 |
| - } |
363 |
| - } |
364 | 361 | }
|
365 | 362 |
|
366 | 363 | //in case this was a starting prewarm
|
367 | 364 | prewarmStartingPool.get(sender()).foreach { data =>
|
368 | 365 | logging.info(this, s"failed starting prewarm [kind: ${data._1}, memory: ${data._2.toString}] removed")
|
369 | 366 | prewarmStartingPool = prewarmStartingPool - sender()
|
370 |
| - prewarmCreateFailedCount.get(data._1, data._2) match { |
371 |
| - case Some(retry) => |
372 |
| - prewarmCreateFailedCount = prewarmCreateFailedCount + ((data._1, data._2) -> (retry + 1)) |
373 |
| - case _ => |
374 |
| - prewarmCreateFailedCount = prewarmCreateFailedCount + ((data._1, data._2) -> 1) |
375 |
| - } |
| 367 | + prewarmCreateFailedCount.incrementAndGet() |
376 | 368 | }
|
377 | 369 |
|
378 | 370 | //backfill prewarms on every ContainerRemoved, just in case
|
@@ -408,7 +400,7 @@ class FunctionPullingContainerPool(
|
408 | 400 |
|
409 | 401 | case AdjustPrewarmedContainer =>
|
410 | 402 | // Reset the prewarmCreateCount value when do expiration check and backfill prewarm if possible
|
411 |
| - prewarmCreateFailedCount = immutable.Map.empty[(String, ByteSize), Int] |
| 403 | + prewarmCreateFailedCount.set(0) |
412 | 404 | adjustPrewarmedContainer(false, true)
|
413 | 405 | }
|
414 | 406 |
|
@@ -442,8 +434,7 @@ class FunctionPullingContainerPool(
|
442 | 434 | val config = c._1
|
443 | 435 | val currentCount = c._2._1
|
444 | 436 | val desiredCount = c._2._2
|
445 |
| - val retry = prewarmCreateFailedCount.get((config.exec.kind, config.memoryLimit)).getOrElse(0) |
446 |
| - if (retry > poolConfig.prewarmMaxRetryLimit) { |
| 437 | + if (prewarmCreateFailedCount.get() > poolConfig.prewarmMaxRetryLimit) { |
447 | 438 | logging.warn(
|
448 | 439 | this,
|
449 | 440 | s"[kind: ${config.exec.kind}, memory: ${config.memoryLimit.toString}] prewarm create failed count exceeds max retry limit: ${poolConfig.prewarmMaxRetryLimit}, currentCount: ${currentCount}, desiredCount: ${desiredCount}")
|
|
0 commit comments