Skip to content

Commit 34664c6

Browse files
bdoyle0182Brendan Doyle
authored andcommitted
Add Function Cache Refresh If Invoker Is Running Container For Function (apache#5327)
* wip * wip * add cache refresh to container proxy Co-authored-by: Brendan Doyle <[email protected]>
1 parent 954e132 commit 34664c6

File tree

1 file changed

+25
-9
lines changed

1 file changed

+25
-9
lines changed

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@ package org.apache.openwhisk.core.containerpool.v2
1919

2020
import java.net.InetSocketAddress
2121
import java.time.Instant
22-
2322
import akka.actor.Status.{Failure => FailureMessage}
24-
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
23+
import akka.actor.{actorRef2Scala, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
2524
import akka.event.Logging.InfoLevel
2625
import akka.io.{IO, Tcp}
2726
import akka.pattern.pipe
@@ -87,6 +86,7 @@ case class Initialized(data: InitializedData)
8786
case class Resumed(data: WarmData)
8887
case class ResumeFailed(data: WarmData)
8988
case class RecreateClient(action: ExecutableWhiskAction)
89+
case object PingCache
9090
case class DetermineKeepContainer(attempt: Int)
9191

9292
// States
@@ -209,7 +209,8 @@ class FunctionPullingContainerProxy(
209209
private val KeepingTimeoutName = "KeepingTimeout"
210210
private val RunningActivationTimeoutName = "RunningActivationTimeout"
211211
private val runningActivationTimeout = 10.seconds
212-
212+
private val PingCacheName = "PingCache"
213+
private val pingCacheInterval = 1.minute
213214
private var timedOut = false
214215

215216
var healthPingActor: Option[ActorRef] = None //setup after prewarm starts
@@ -374,6 +375,7 @@ class FunctionPullingContainerProxy(
374375
case Event(initializedData: InitializedData, _) =>
375376
context.parent ! Initialized(initializedData)
376377
initializedData.clientProxy ! RequestActivation()
378+
startTimerWithFixedDelay(PingCacheName, PingCache, pingCacheInterval)
377379
startSingleTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
378380
stay() using initializedData
379381

@@ -469,7 +471,7 @@ class FunctionPullingContainerProxy(
469471
data.action.rev,
470472
None)
471473

472-
case _ => delay
474+
case x: Event if x.event != PingCache => delay
473475
}
474476

475477
when(Rescheduling, stateTimeout = 10.seconds) {
@@ -626,7 +628,7 @@ class FunctionPullingContainerProxy(
626628
data.clientProxy ! CloseClientProxy
627629
stay
628630

629-
case _ => delay
631+
case x: Event if x.event != PingCache => delay
630632
}
631633

632634
when(Pausing) {
@@ -653,7 +655,7 @@ class FunctionPullingContainerProxy(
653655
data.action.rev,
654656
Some(data.clientProxy))
655657

656-
case _ => delay
658+
case x: Event if x.event != PingCache => delay
657659
}
658660

659661
when(Paused) {
@@ -745,7 +747,8 @@ class FunctionPullingContainerProxy(
745747
data.action.fullyQualifiedName(false),
746748
data.action.rev,
747749
Some(data.clientProxy))
748-
case _ => delay
750+
751+
case x: Event if x.event != PingCache => delay
749752
}
750753

751754
when(Removing, unusedTimeout) {
@@ -768,6 +771,20 @@ class FunctionPullingContainerProxy(
768771
stay()
769772
}
770773

774+
whenUnhandled {
775+
case Event(PingCache, data: WarmData) =>
776+
val actionId = data.action.fullyQualifiedName(false).toDocId.asDocInfo(data.revision)
777+
get(entityStore, actionId.id, actionId.rev, true).map(_ => {
778+
logging.debug(
779+
this,
780+
s"Refreshed function cache for action ${data.action} from container ${data.container.containerId}.")
781+
})
782+
stay
783+
case Event(PingCache, _) =>
784+
logging.debug(this, "Container is not warm, ignore function cache ping.")
785+
stay
786+
}
787+
771788
onTransition {
772789
case _ -> Uninitialized => unstashAll()
773790
case _ -> CreatingContainer => unstashAll()
@@ -823,15 +840,14 @@ class FunctionPullingContainerProxy(
823840
fqn: FullyQualifiedEntityName,
824841
revision: DocRevision,
825842
clientProxy: Option[ActorRef]): State = {
826-
843+
cancelTimer(PingCacheName)
827844
dataManagementService ! UnregisterData(
828845
s"${ContainerKeys.existingContainers(invocationNamespace, fqn, revision, Some(instance), Some(container.containerId))}")
829846

830847
cleanUp(container, clientProxy)
831848
}
832849

833850
private def cleanUp(container: Container, clientProxy: Option[ActorRef], replacePrewarm: Boolean = true): State = {
834-
835851
context.parent ! ContainerRemoved(replacePrewarm)
836852
val unpause = stateName match {
837853
case Paused => container.resume()(TransactionId.invokerNanny)

0 commit comments

Comments
 (0)