Skip to content

Commit 9005a08

Browse files
authored
Add fake clock for test code (#5304)
* Add fake clock for test code * Add test code for state timeout * Add test case for transaction _ => Flushing * Add StateTimeout test for Flushing state
1 parent 1a0f1ce commit 9005a08

File tree

4 files changed

+324
-86
lines changed

4 files changed

+324
-86
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.common.time
19+
20+
import java.time.Instant
21+
22+
trait Clock {
23+
def now(): Instant
24+
}
25+
26+
object SystemClock extends Clock {
27+
def now() = Instant.now()
28+
}

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import akka.actor.Status.{Failure => FailureMessage}
2121
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
2222
import akka.util.Timeout
2323
import org.apache.openwhisk.common._
24+
import org.apache.openwhisk.common.time.{Clock, SystemClock}
2425
import org.apache.openwhisk.core.ConfigKeys
2526
import org.apache.openwhisk.core.ack.ActiveAck
2627
import org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit
@@ -121,13 +122,14 @@ class MemoryQueue(private val etcdClient: EtcdClient,
121122
ack: ActiveAck,
122123
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
123124
getUserLimit: String => Future[Int],
124-
checkToDropStaleActivation: (Queue[TimeSeriesActivationEntry],
125+
checkToDropStaleActivation: (Clock,
126+
Queue[TimeSeriesActivationEntry],
125127
Long,
126128
String,
127129
WhiskActionMetaData,
128130
MemoryQueueState,
129131
ActorRef) => Unit,
130-
queueConfig: QueueConfig)(implicit logging: Logging)
132+
queueConfig: QueueConfig)(implicit logging: Logging, clock: Clock)
131133
extends FSM[MemoryQueueState, MemoryQueueData]
132134
with Stash {
133135

@@ -342,7 +344,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
342344
msg.transid)
343345
val whiskError = isWhiskError(data.error)
344346
if (whiskError)
345-
queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg))
347+
queue = queue.enqueue(TimeSeriesActivationEntry(clock.now(), msg))
346348
else
347349
completeErrorActivation(msg, data.reason, whiskError)
348350
stay() using data.copy(activeDuringFlush = true)
@@ -351,8 +353,12 @@ class MemoryQueue(private val etcdClient: EtcdClient,
351353
// Instead, StateTimeout message will be sent by a timer.
352354
case Event(StateTimeout | DropOld, data: FlushingData) =>
353355
logging.info(this, s"[$invocationNamespace:$action:$stateName] Received StateTimeout, drop stale messages.")
354-
queue =
355-
MemoryQueue.dropOld(queue, Duration.ofMillis(actionRetentionTimeout), data.reason, completeErrorActivation)
356+
queue = MemoryQueue.dropOld(
357+
clock,
358+
queue,
359+
Duration.ofMillis(actionRetentionTimeout),
360+
data.reason,
361+
completeErrorActivation)
356362
if (data.activeDuringFlush || queue.nonEmpty)
357363
stay using data.copy(activeDuringFlush = false)
358364
else
@@ -540,7 +546,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
540546

541547
case Event(DropOld, _) =>
542548
if (queue.nonEmpty && Duration
543-
.between(queue.head.timestamp, Instant.now)
549+
.between(queue.head.timestamp, clock.now())
544550
.compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
545551
logging.error(
546552
this,
@@ -550,6 +556,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
550556
s"[$invocationNamespace:$action:$stateName] the head stale message: ${queue.head.msg.activationId}")
551557
}
552558
queue = MemoryQueue.dropOld(
559+
clock,
553560
queue,
554561
Duration.ofMillis(actionRetentionTimeout),
555562
s"Activation processing is not initiated for $actionRetentionTimeout ms",
@@ -706,6 +713,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
706713
NoData()
707714
}
708715
}
716+
709717
private def cleanUpWatcher(): Unit = {
710718
watchedKeys.foreach { key =>
711719
watcherService ! UnwatchEndpoint(key, isPrefix = true, watcherName)
@@ -883,7 +891,14 @@ class MemoryQueue(private val etcdClient: EtcdClient,
883891
// these schedulers will run forever and stop when the memory queue stops
884892
private def startMonitoring(): (ActorRef, ActorRef) = {
885893
val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
886-
checkToDropStaleActivation(queue, actionRetentionTimeout, invocationNamespace, actionMetaData, stateName, self)
894+
checkToDropStaleActivation(
895+
clock,
896+
queue,
897+
actionRetentionTimeout,
898+
invocationNamespace,
899+
actionMetaData,
900+
stateName,
901+
self)
887902
Future.successful(())
888903
}
889904

@@ -930,7 +945,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
930945
@tailrec
931946
private def getStaleActivationNum(count: Int, queue: Queue[TimeSeriesActivationEntry]): Int = {
932947
if (queue.isEmpty || Duration
933-
.between(queue.head.timestamp, Instant.now)
948+
.between(queue.head.timestamp, clock.now())
934949
.compareTo(StaleDuration) < 0) count
935950
else
936951
getStaleActivationNum(count + 1, queue.tail)
@@ -988,7 +1003,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
9881003
stay
9891004
}
9901005
.getOrElse {
991-
queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg))
1006+
queue = queue.enqueue(TimeSeriesActivationEntry(clock.now(), msg))
9921007
in.decrementAndGet()
9931008
tryEnableActionThrottling()
9941009
}
@@ -1051,7 +1066,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
10511066

10521067
/** Generates an activation with zero runtime. Usually used for error cases */
10531068
private def generateFallbackActivation(msg: ActivationMessage, response: ActivationResponse): WhiskActivation = {
1054-
val now = Instant.now
1069+
val now = clock.now()
10551070
val causedBy = if (msg.causedBySequence) {
10561071
Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE)))
10571072
} else None
@@ -1101,6 +1116,7 @@ object MemoryQueue {
11011116
ack: ActiveAck,
11021117
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
11031118
getUserLimit: String => Future[Int])(implicit logging: Logging): Props = {
1119+
implicit val clock: Clock = SystemClock
11041120
Props(
11051121
new MemoryQueue(
11061122
etcdClient,
@@ -1126,19 +1142,21 @@ object MemoryQueue {
11261142

11271143
@tailrec
11281144
def dropOld(
1145+
clock: Clock,
11291146
queue: Queue[TimeSeriesActivationEntry],
11301147
retention: Duration,
11311148
reason: String,
11321149
completeErrorActivation: (ActivationMessage, String, Boolean) => Future[Any]): Queue[TimeSeriesActivationEntry] = {
1133-
if (queue.isEmpty || Duration.between(queue.head.timestamp, Instant.now).compareTo(retention) < 0)
1150+
if (queue.isEmpty || Duration.between(queue.head.timestamp, clock.now()).compareTo(retention) < 0)
11341151
queue
11351152
else {
11361153
completeErrorActivation(queue.head.msg, reason, true)
1137-
dropOld(queue.tail, retention, reason, completeErrorActivation)
1154+
dropOld(clock, queue.tail, retention, reason, completeErrorActivation)
11381155
}
11391156
}
11401157

1141-
def checkToDropStaleActivation(queue: Queue[TimeSeriesActivationEntry],
1158+
def checkToDropStaleActivation(clock: Clock,
1159+
queue: Queue[TimeSeriesActivationEntry],
11421160
maxRetentionMs: Long,
11431161
invocationNamespace: String,
11441162
actionMetaData: WhiskActionMetaData,
@@ -1150,7 +1168,7 @@ object MemoryQueue {
11501168
s"[$invocationNamespace:$action:$stateName] use the given retention timeout: $maxRetentionMs for this action kind: ${actionMetaData.exec.kind}.")
11511169

11521170
if (queue.nonEmpty && Duration
1153-
.between(queue.head.timestamp, Instant.now)
1171+
.between(queue.head.timestamp, clock.now())
11541172
.compareTo(Duration.ofMillis(maxRetentionMs)) >= 0) {
11551173
logging.info(
11561174
this,

0 commit comments

Comments
 (0)