@@ -19,9 +19,8 @@ package org.apache.openwhisk.core.containerpool.v2
19
19
20
20
import java .net .InetSocketAddress
21
21
import java .time .Instant
22
-
23
22
import akka .actor .Status .{Failure => FailureMessage }
24
- import akka .actor .{ActorRef , ActorRefFactory , ActorSystem , FSM , Props , Stash }
23
+ import akka .actor .{actorRef2Scala , ActorRef , ActorRefFactory , ActorSystem , FSM , Props , Stash }
25
24
import akka .event .Logging .InfoLevel
26
25
import akka .io .{IO , Tcp }
27
26
import akka .pattern .pipe
@@ -87,6 +86,7 @@ case class Initialized(data: InitializedData)
87
86
case class Resumed (data : WarmData )
88
87
case class ResumeFailed (data : WarmData )
89
88
case class RecreateClient (action : ExecutableWhiskAction )
89
+ case object PingCache
90
90
case class DetermineKeepContainer (attempt : Int )
91
91
92
92
// States
@@ -209,7 +209,8 @@ class FunctionPullingContainerProxy(
209
209
private val KeepingTimeoutName = " KeepingTimeout"
210
210
private val RunningActivationTimeoutName = " RunningActivationTimeout"
211
211
private val runningActivationTimeout = 10 .seconds
212
-
212
+ private val PingCacheName = " PingCache"
213
+ private val pingCacheInterval = 1 .minute
213
214
private var timedOut = false
214
215
215
216
var healthPingActor : Option [ActorRef ] = None // setup after prewarm starts
@@ -374,6 +375,7 @@ class FunctionPullingContainerProxy(
374
375
case Event (initializedData : InitializedData , _) =>
375
376
context.parent ! Initialized (initializedData)
376
377
initializedData.clientProxy ! RequestActivation ()
378
+ startTimerWithFixedDelay(PingCacheName , PingCache , pingCacheInterval)
377
379
startSingleTimer(UnusedTimeoutName , StateTimeout , unusedTimeout)
378
380
stay() using initializedData
379
381
@@ -469,7 +471,7 @@ class FunctionPullingContainerProxy(
469
471
data.action.rev,
470
472
None )
471
473
472
- case _ => delay
474
+ case x : Event if x.event != PingCache => delay
473
475
}
474
476
475
477
when(Rescheduling , stateTimeout = 10 .seconds) {
@@ -626,7 +628,7 @@ class FunctionPullingContainerProxy(
626
628
data.clientProxy ! CloseClientProxy
627
629
stay
628
630
629
- case _ => delay
631
+ case x : Event if x.event != PingCache => delay
630
632
}
631
633
632
634
when(Pausing ) {
@@ -653,7 +655,7 @@ class FunctionPullingContainerProxy(
653
655
data.action.rev,
654
656
Some (data.clientProxy))
655
657
656
- case _ => delay
658
+ case x : Event if x.event != PingCache => delay
657
659
}
658
660
659
661
when(Paused ) {
@@ -745,7 +747,8 @@ class FunctionPullingContainerProxy(
745
747
data.action.fullyQualifiedName(false ),
746
748
data.action.rev,
747
749
Some (data.clientProxy))
748
- case _ => delay
750
+
751
+ case x : Event if x.event != PingCache => delay
749
752
}
750
753
751
754
when(Removing , unusedTimeout) {
@@ -768,6 +771,20 @@ class FunctionPullingContainerProxy(
768
771
stay()
769
772
}
770
773
774
+ whenUnhandled {
775
+ case Event (PingCache , data : WarmData ) =>
776
+ val actionId = data.action.fullyQualifiedName(false ).toDocId.asDocInfo(data.revision)
777
+ get(entityStore, actionId.id, actionId.rev, true ).map(_ => {
778
+ logging.debug(
779
+ this ,
780
+ s " Refreshed function cache for action ${data.action} from container ${data.container.containerId}. " )
781
+ })
782
+ stay
783
+ case Event (PingCache , _) =>
784
+ logging.debug(this , " Container is not warm, ignore function cache ping." )
785
+ stay
786
+ }
787
+
771
788
onTransition {
772
789
case _ -> Uninitialized => unstashAll()
773
790
case _ -> CreatingContainer => unstashAll()
@@ -823,15 +840,14 @@ class FunctionPullingContainerProxy(
823
840
fqn : FullyQualifiedEntityName ,
824
841
revision : DocRevision ,
825
842
clientProxy : Option [ActorRef ]): State = {
826
-
843
+ cancelTimer( PingCacheName )
827
844
dataManagementService ! UnregisterData (
828
845
s " ${ContainerKeys .existingContainers(invocationNamespace, fqn, revision, Some (instance), Some (container.containerId))}" )
829
846
830
847
cleanUp(container, clientProxy)
831
848
}
832
849
833
850
private def cleanUp (container : Container , clientProxy : Option [ActorRef ], replacePrewarm : Boolean = true ): State = {
834
-
835
851
context.parent ! ContainerRemoved (replacePrewarm)
836
852
val unpause = stateName match {
837
853
case Paused => container.resume()(TransactionId .invokerNanny)
0 commit comments