@@ -21,6 +21,7 @@ import akka.actor.Status.{Failure => FailureMessage}
21
21
import akka .actor .{ActorRef , ActorSystem , Cancellable , FSM , Props , Stash }
22
22
import akka .util .Timeout
23
23
import org .apache .openwhisk .common ._
24
+ import org .apache .openwhisk .common .time .{Clock , SystemClock }
24
25
import org .apache .openwhisk .core .ConfigKeys
25
26
import org .apache .openwhisk .core .ack .ActiveAck
26
27
import org .apache .openwhisk .core .connector .ContainerCreationError .ZeroNamespaceLimit
@@ -121,13 +122,14 @@ class MemoryQueue(private val etcdClient: EtcdClient,
121
122
ack : ActiveAck ,
122
123
store : (TransactionId , WhiskActivation , UserContext ) => Future [Any ],
123
124
getUserLimit : String => Future [Int ],
124
- checkToDropStaleActivation : (Queue [TimeSeriesActivationEntry ],
125
+ checkToDropStaleActivation : (Clock ,
126
+ Queue [TimeSeriesActivationEntry ],
125
127
Long ,
126
128
String ,
127
129
WhiskActionMetaData ,
128
130
MemoryQueueState ,
129
131
ActorRef ) => Unit ,
130
- queueConfig : QueueConfig )(implicit logging : Logging )
132
+ queueConfig : QueueConfig )(implicit logging : Logging , clock : Clock )
131
133
extends FSM [MemoryQueueState , MemoryQueueData ]
132
134
with Stash {
133
135
@@ -342,7 +344,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
342
344
msg.transid)
343
345
val whiskError = isWhiskError(data.error)
344
346
if (whiskError)
345
- queue = queue.enqueue(TimeSeriesActivationEntry (Instant .now, msg))
347
+ queue = queue.enqueue(TimeSeriesActivationEntry (clock .now() , msg))
346
348
else
347
349
completeErrorActivation(msg, data.reason, whiskError)
348
350
stay() using data.copy(activeDuringFlush = true )
@@ -351,8 +353,12 @@ class MemoryQueue(private val etcdClient: EtcdClient,
351
353
// Instead, StateTimeout message will be sent by a timer.
352
354
case Event (StateTimeout | DropOld , data : FlushingData ) =>
353
355
logging.info(this , s " [ $invocationNamespace: $action: $stateName] Received StateTimeout, drop stale messages. " )
354
- queue =
355
- MemoryQueue .dropOld(queue, Duration .ofMillis(actionRetentionTimeout), data.reason, completeErrorActivation)
356
+ queue = MemoryQueue .dropOld(
357
+ clock,
358
+ queue,
359
+ Duration .ofMillis(actionRetentionTimeout),
360
+ data.reason,
361
+ completeErrorActivation)
356
362
if (data.activeDuringFlush || queue.nonEmpty)
357
363
stay using data.copy(activeDuringFlush = false )
358
364
else
@@ -540,7 +546,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
540
546
541
547
case Event (DropOld , _) =>
542
548
if (queue.nonEmpty && Duration
543
- .between(queue.head.timestamp, Instant .now)
549
+ .between(queue.head.timestamp, clock .now() )
544
550
.compareTo(Duration .ofMillis(actionRetentionTimeout)) < 0 ) {
545
551
logging.error(
546
552
this ,
@@ -550,6 +556,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
550
556
s " [ $invocationNamespace: $action: $stateName] the head stale message: ${queue.head.msg.activationId}" )
551
557
}
552
558
queue = MemoryQueue .dropOld(
559
+ clock,
553
560
queue,
554
561
Duration .ofMillis(actionRetentionTimeout),
555
562
s " Activation processing is not initiated for $actionRetentionTimeout ms " ,
@@ -706,6 +713,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
706
713
NoData ()
707
714
}
708
715
}
716
+
709
717
private def cleanUpWatcher (): Unit = {
710
718
watchedKeys.foreach { key =>
711
719
watcherService ! UnwatchEndpoint (key, isPrefix = true , watcherName)
@@ -883,7 +891,14 @@ class MemoryQueue(private val etcdClient: EtcdClient,
883
891
// these schedulers will run forever and stop when the memory queue stops
884
892
private def startMonitoring (): (ActorRef , ActorRef ) = {
885
893
val droppingScheduler = Scheduler .scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
886
- checkToDropStaleActivation(queue, actionRetentionTimeout, invocationNamespace, actionMetaData, stateName, self)
894
+ checkToDropStaleActivation(
895
+ clock,
896
+ queue,
897
+ actionRetentionTimeout,
898
+ invocationNamespace,
899
+ actionMetaData,
900
+ stateName,
901
+ self)
887
902
Future .successful(())
888
903
}
889
904
@@ -930,7 +945,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
930
945
@ tailrec
931
946
private def getStaleActivationNum (count : Int , queue : Queue [TimeSeriesActivationEntry ]): Int = {
932
947
if (queue.isEmpty || Duration
933
- .between(queue.head.timestamp, Instant .now)
948
+ .between(queue.head.timestamp, clock .now() )
934
949
.compareTo(StaleDuration ) < 0 ) count
935
950
else
936
951
getStaleActivationNum(count + 1 , queue.tail)
@@ -988,7 +1003,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
988
1003
stay
989
1004
}
990
1005
.getOrElse {
991
- queue = queue.enqueue(TimeSeriesActivationEntry (Instant .now, msg))
1006
+ queue = queue.enqueue(TimeSeriesActivationEntry (clock .now() , msg))
992
1007
in.decrementAndGet()
993
1008
tryEnableActionThrottling()
994
1009
}
@@ -1051,7 +1066,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
1051
1066
1052
1067
/** Generates an activation with zero runtime. Usually used for error cases */
1053
1068
private def generateFallbackActivation (msg : ActivationMessage , response : ActivationResponse ): WhiskActivation = {
1054
- val now = Instant .now
1069
+ val now = clock .now()
1055
1070
val causedBy = if (msg.causedBySequence) {
1056
1071
Some (Parameters (WhiskActivation .causedByAnnotation, JsString (Exec .SEQUENCE )))
1057
1072
} else None
@@ -1101,6 +1116,7 @@ object MemoryQueue {
1101
1116
ack : ActiveAck ,
1102
1117
store : (TransactionId , WhiskActivation , UserContext ) => Future [Any ],
1103
1118
getUserLimit : String => Future [Int ])(implicit logging : Logging ): Props = {
1119
+ implicit val clock : Clock = SystemClock
1104
1120
Props (
1105
1121
new MemoryQueue (
1106
1122
etcdClient,
@@ -1126,19 +1142,21 @@ object MemoryQueue {
1126
1142
1127
1143
@ tailrec
1128
1144
def dropOld (
1145
+ clock : Clock ,
1129
1146
queue : Queue [TimeSeriesActivationEntry ],
1130
1147
retention : Duration ,
1131
1148
reason : String ,
1132
1149
completeErrorActivation : (ActivationMessage , String , Boolean ) => Future [Any ]): Queue [TimeSeriesActivationEntry ] = {
1133
- if (queue.isEmpty || Duration .between(queue.head.timestamp, Instant .now).compareTo(retention) < 0 )
1150
+ if (queue.isEmpty || Duration .between(queue.head.timestamp, clock .now() ).compareTo(retention) < 0 )
1134
1151
queue
1135
1152
else {
1136
1153
completeErrorActivation(queue.head.msg, reason, true )
1137
- dropOld(queue.tail, retention, reason, completeErrorActivation)
1154
+ dropOld(clock, queue.tail, retention, reason, completeErrorActivation)
1138
1155
}
1139
1156
}
1140
1157
1141
- def checkToDropStaleActivation (queue : Queue [TimeSeriesActivationEntry ],
1158
+ def checkToDropStaleActivation (clock : Clock ,
1159
+ queue : Queue [TimeSeriesActivationEntry ],
1142
1160
maxRetentionMs : Long ,
1143
1161
invocationNamespace : String ,
1144
1162
actionMetaData : WhiskActionMetaData ,
@@ -1150,7 +1168,7 @@ object MemoryQueue {
1150
1168
s " [ $invocationNamespace: $action: $stateName] use the given retention timeout: $maxRetentionMs for this action kind: ${actionMetaData.exec.kind}. " )
1151
1169
1152
1170
if (queue.nonEmpty && Duration
1153
- .between(queue.head.timestamp, Instant .now)
1171
+ .between(queue.head.timestamp, clock .now() )
1154
1172
.compareTo(Duration .ofMillis(maxRetentionMs)) >= 0 ) {
1155
1173
logging.info(
1156
1174
this ,
0 commit comments