Skip to content

Commit 37f1f99

Browse files
authored
send old version memoryQueue's stale activation to queueManager when update action (#5228)
* Complele old activation when update action When old container doesn't exist * Send old version activation to queueManager to reschedule * Fix test case error * Add exponential delay time retry * Fix test case * Make queueManager test case more stable * Optimize memoryQueue test case * Make test case more stable * Change method name more readable
1 parent 33cfb36 commit 37f1f99

File tree

10 files changed

+157
-50
lines changed

10 files changed

+157
-50
lines changed

ansible/group_vars/all

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ scheduler:
510510
extraEnv: "{{ scheduler_extraEnv | default({}) }}"
511511
dataManagementService:
512512
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
513-
inProgressJobRetentionSecond: "{{ scheduler_inProgressJobRetentionSecond | default('20 seconds') }}"
513+
inProgressJobRetention: "{{ scheduler_inProgressJobRetention | default('20 seconds') }}"
514514
managedFraction: "{{ scheduler_managed_fraction | default(1.0 - (scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
515515
blackboxFraction: "{{ scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction) }}"
516516
queueManager:

ansible/roles/schedulers/tasks/deploy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
"CONFIG_whisk_scheduler_protocol": "{{ scheduler.protocol }}"
113113
"CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
114114
"CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ scheduler.dataManagementService.retryInterval }}"
115-
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetentionSecond }}"
115+
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetention }}"
116116
"CONFIG_whisk_scheduler_queueManager_maxSchedulingTime": "{{ scheduler.queueManager.maxSchedulingTime }}"
117117
"CONFIG_whisk_scheduler_queueManager_maxRetriesToGetQueue": "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
118118
"CONFIG_whisk_scheduler_queue_idleGrace": "{{ scheduler.queue.idleGrace }}"

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ object ConfigKeys {
302302
val schedulerMaxPeek = "whisk.scheduler.max-peek"
303303
val schedulerQueue = "whisk.scheduler.queue"
304304
val schedulerQueueManager = "whisk.scheduler.queue-manager"
305-
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
305+
val schedulerInProgressJobRetention = "whisk.scheduler.in-progress-job-retention"
306306

307307
val whiskClusterName = "whisk.cluster.name"
308308

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,8 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
5050
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
5151
extends Actor {
5252
private implicit val ec: ExecutionContext = actorSystem.dispatcher
53-
private val baseTimeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond)
53+
private val baseTimeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetention)
5454
private val retryLimit = 5
55-
private val retryDelayTime = 100.milliseconds
5655

5756
/**
5857
* Store a JobEntry in local to get an alarm for key timeout
@@ -83,7 +82,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
8382
error,
8483
reason)) =>
8584
if (error.isEmpty) {
86-
logging.info(this, s"[$creationId] create container successfully")
85+
logging.info(this, s"[$action] [$creationId] create container successfully")
8786
deleteJob(
8887
invocationNamespace,
8988
action,
@@ -97,7 +96,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
9796
if (retryCount >= retryLimit || !error.exists(ContainerCreationError.whiskErrors.contains)) {
9897
logging.error(
9998
this,
100-
s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation")
99+
s"[$action] [$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation")
101100
// Delete from pool after all retries are failed
102101
deleteJob(
103102
invocationNamespace,
@@ -109,8 +108,9 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
109108
// Reschedule
110109
logging.error(
111110
this,
112-
s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Started rescheduling")
113-
// Add some time interval during retry create container, because etcd put operation needs some time if data inconsistant happens
111+
s"[$action] [$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Started rescheduling")
112+
// Add some exponential delay time interval during retry create container, because etcd put operation needs some time if data inconsistant happens
113+
val retryDelayTime = (scala.math.pow(2, retryCount) * 100).milliseconds
114114
actorSystem.scheduler.scheduleOnce(retryDelayTime) {
115115
context.parent ! ReschedulingCreationJob(
116116
tid,
@@ -183,16 +183,14 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
183183
s"Failed to create a container for $action(blackbox: $isBlackbox), error: $creationId timed out after $timeout")
184184
creationJobPool
185185
.remove(creationId)
186-
.foreach(
187-
_ =>
188-
sendState(
189-
FailedCreationJob(
190-
creationId,
191-
invocationNamespace,
192-
action,
193-
revision,
194-
ContainerCreationError.TimeoutError,
195-
s"timeout waiting for the ack of $creationId after $timeout")))
186+
.foreach(_ =>
187+
sendState(FailedCreationJob(
188+
creationId,
189+
invocationNamespace,
190+
action,
191+
revision,
192+
ContainerCreationError.TimeoutError,
193+
s"[$action] timeout waiting for the ack of $creationId after $timeout")))
196194
dataManagementService ! UnregisterData(
197195
inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId))
198196
}

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
384384
logging.info(
385385
this,
386386
s"[$invocationNamespace:$action:$stateName] The queue received StopSchedulingAsOutdated trying to stop the queue.")
387+
388+
handleStaleActivationsWhenActionUpdated(context.parent)
389+
387390
cleanUpActorsAndGotoRemovedIfPossible(data.copy(outdated = true))
388391
}
389392

@@ -561,6 +564,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
561564
// let QueueManager know this queue is no longer in charge.
562565
context.parent ! staleQueueRemovedMsg
563566

567+
handleStaleActivationsWhenActionUpdated(context.parent)
568+
564569
goto(Removing) using getRemovingData(data, outdated = true)
565570

566571
case Event(t: FailureMessage, _) =>
@@ -829,6 +834,23 @@ class MemoryQueue(private val etcdClient: EtcdClient,
829834
}
830835
}
831836

837+
838+
private def handleStaleActivationsWhenActionUpdated(queueManager: ActorRef): Unit = {
839+
if (queue.size > 0) {
840+
// if doesn't exist old container to pull old memoryQueue's activation, send the old activations to queueManager
841+
if (containers.size == 0) {
842+
logging.warn(
843+
this,
844+
s"[$invocationNamespace:$action:$stateName] does not exist old version container to fetch the old version activation")
845+
forwardAllActivations(queueManager)
846+
} else {
847+
logging.info(
848+
this,
849+
s"[$invocationNamespace:$action:$stateName] old version activation would be fetched by old version container")
850+
}
851+
}
852+
}
853+
832854
private def completeAllActivations(reason: String, isWhiskError: Boolean): Unit = {
833855
while (queue.nonEmpty) {
834856
val (TimeSeriesActivationEntry(_, msg), newQueue) = queue.dequeue

tests/src/test/resources/application.conf.j2

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ whisk {
161161
max-retries-to-get-queue = "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
162162
}
163163
max-peek = "{{ scheduler.maxPeek }}"
164-
in-progress-job-retention = "{{ scheduler.inProgressJobRetentionSecond | default('20 seconds') }}"
164+
in-progress-job-retention = "{{ scheduler.inProgressJobRetention | default('20 seconds') }}"
165165
}
166166
}
167167

tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class CreationJobManagerTests
5555
with BeforeAndAfterEach
5656
with StreamLogging {
5757

58-
private val timeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond)
58+
private val timeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetention)
5959
val blackboxTimeout = FiniteDuration(timeout.toSeconds * 3, TimeUnit.SECONDS)
6060
implicit val ece: ExecutionContextExecutor = system.dispatcher
6161
val config = new WhiskConfig(ExecManifest.requiredProperties)
@@ -322,7 +322,7 @@ class CreationJobManagerTests
322322
registerMessage.msg.action,
323323
registerMessage.msg.revision,
324324
ContainerCreationError.TimeoutError,
325-
s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $timeout"))
325+
s"[${registerMessage.msg.action}] timeout waiting for the ack of ${registerMessage.msg.creationId} after $timeout"))
326326
}
327327

328328
it should "increase the timeout if an action is a blackbox action" in {
@@ -373,7 +373,7 @@ class CreationJobManagerTests
373373
registerMessage.msg.action,
374374
registerMessage.msg.revision,
375375
ContainerCreationError.TimeoutError,
376-
s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $blackboxTimeout"))
376+
s"[${registerMessage.msg.action}] timeout waiting for the ack of ${registerMessage.msg.creationId} after $blackboxTimeout"))
377377
}
378378

379379
it should "delete a creation job with too many retry and send a FailedCreationJob to a queue" in {

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,7 @@ import org.apache.openwhisk.core.connector.ContainerCreationMessage
1010
import org.apache.openwhisk.core.entity._
1111
import org.apache.openwhisk.core.etcd.EtcdClient
1212
import org.apache.openwhisk.core.scheduler.grpc.ActivationResponse
13-
import org.apache.openwhisk.core.scheduler.message.{
14-
ContainerCreation,
15-
ContainerDeletion,
16-
FailedCreationJob,
17-
SuccessfulCreationJob
18-
}
13+
import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
1914
import org.apache.openwhisk.core.scheduler.queue.MemoryQueue.checkToDropStaleActivation
2015
import org.apache.openwhisk.core.scheduler.queue._
2116
import org.apache.openwhisk.core.service._
@@ -27,6 +22,8 @@ import org.scalatest.concurrent.ScalaFutures
2722
import org.scalatest.junit.JUnitRunner
2823
import spray.json.{JsObject, JsString}
2924

25+
import java.time.Instant
26+
import scala.collection.immutable.Queue
3027
import scala.concurrent.Future
3128
import scala.concurrent.duration.DurationInt
3229
import scala.language.postfixOps
@@ -1399,6 +1396,7 @@ class MemoryQueueFlowTests
13991396

14001397
// this is to guarantee StopScheduling is handled in all states
14011398
it should "handle StopScheduling in any states." in {
1399+
val testContainerId = "fakeContainerId"
14021400
val allStates =
14031401
List(Running, Idle, ActionThrottled, NamespaceThrottled, Flushing, Removing, Removed)
14041402

@@ -1461,6 +1459,7 @@ class MemoryQueueFlowTests
14611459
fsm.setState(state, FlushingData(schedulingActors.ref, schedulingActors.ref, WhiskError, "whisk error"))
14621460

14631461
case Removing =>
1462+
fsm.underlyingActor.containers = Set(testContainerId)
14641463
fsm ! message
14651464
fsm.setState(state, RemovingData(schedulingActors.ref, schedulingActors.ref, outdated = true))
14661465

@@ -1491,15 +1490,18 @@ class MemoryQueueFlowTests
14911490
fsm ! StopSchedulingAsOutdated
14921491

14931492
state match {
1494-
// queue will be gracefully shutdown.
14951493
case Removing =>
1496-
// queue should not be terminated as there is an activation
1497-
Thread.sleep(gracefulShutdownTimeout.toMillis)
1498-
1494+
// still exist old container for old queue, fetch the queue by old container
14991495
container.send(fsm, getActivation())
15001496
container.expectMsg(ActivationResponse(Right(message)))
1501-
1502-
// queue should not be terminated as there is an activation
1497+
// has no old containers for old queue, so send the message to queueManager
1498+
fsm.underlyingActor.containers = Set.empty[String]
1499+
fsm.underlyingActor.queue =
1500+
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(Instant.now.toEpochMilli + 1000), message))
1501+
fsm ! StopSchedulingAsOutdated
1502+
parent.expectMsg(message)
1503+
1504+
// queue should be terminated after gracefulShutdownTimeout
15031505
Thread.sleep(gracefulShutdownTimeout.toMillis)
15041506

15051507
// clean up actors only because etcd data is being used by a new queue
@@ -1534,20 +1536,13 @@ class MemoryQueueFlowTests
15341536
probe.expectTerminated(fsm, 10.seconds)
15351537

15361538
case _ =>
1537-
// queue is stale and will be removed
15381539
parent.expectMsg(staleQueueRemovedMsg)
1540+
parent.expectMsg(message)
1541+
// queue is stale and will be removed
15391542
probe.expectMsg(Transition(fsm, state, Removing))
15401543

15411544
fsm ! QueueRemovedCompleted
15421545

1543-
// queue should not be terminated as there is an activation
1544-
Thread.sleep(gracefulShutdownTimeout.toMillis)
1545-
1546-
container.send(fsm, getActivation())
1547-
container.expectMsg(ActivationResponse(Right(message)))
1548-
1549-
Thread.sleep(gracefulShutdownTimeout.toMillis)
1550-
15511546
watcher.expectMsgAllOf(
15521547
UnwatchEndpoint(inProgressContainerKey, isPrefix = true, watcherName),
15531548
UnwatchEndpoint(existingContainerKey, isPrefix = true, watcherName),

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,93 @@ class MemoryQueueTests
989989
parent.expectMsg(Transition(fsm, Flushing, Running))
990990
probe.expectNoMessage(2.seconds)
991991
fsm.stop()
992+
}
993+
994+
it should "send old version activation to queueManager when update action if doesn't exist old version container" in {
995+
val mockEtcdClient = mock[EtcdClient]
996+
val probe = TestProbe()
997+
val queueManager = TestProbe()
998+
999+
expectDurationChecking(mockEsClient, testInvocationNamespace)
1000+
1001+
val fsm =
1002+
TestFSMRef(
1003+
new MemoryQueue(
1004+
mockEtcdClient,
1005+
durationChecker,
1006+
fqn,
1007+
mockMessaging(),
1008+
config,
1009+
testInvocationNamespace,
1010+
revision,
1011+
endpoints,
1012+
actionMetadata,
1013+
probe.ref,
1014+
probe.ref,
1015+
probe.ref,
1016+
TestProbe().ref,
1017+
schedulerId,
1018+
ack,
1019+
store,
1020+
getUserLimit,
1021+
checkToDropStaleActivation,
1022+
queueConfig),
1023+
queueManager.ref)
1024+
1025+
val now = Instant.now
1026+
fsm.underlyingActor.queue =
1027+
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message))
1028+
fsm.underlyingActor.containers = Set.empty[String]
1029+
fsm.setState(Running, RunningData(probe.ref, probe.ref))
1030+
fsm ! StopSchedulingAsOutdated // update action
1031+
queueManager.expectMsg(staleQueueRemovedMsg)
1032+
queueManager.expectMsg(message)
1033+
fsm.stop()
1034+
}
1035+
1036+
it should "fetch old version activation by old container when update action" in {
1037+
val mockEtcdClient = mock[EtcdClient]
1038+
val probe = TestProbe()
1039+
val queueManager = TestProbe()
1040+
val tid = TransactionId(TransactionId.generateTid())
1041+
1042+
expectDurationChecking(mockEsClient, testInvocationNamespace)
1043+
1044+
val fsm =
1045+
TestFSMRef(
1046+
new MemoryQueue(
1047+
mockEtcdClient,
1048+
durationChecker,
1049+
fqn,
1050+
mockMessaging(),
1051+
config,
1052+
testInvocationNamespace,
1053+
revision,
1054+
endpoints,
1055+
actionMetadata,
1056+
probe.ref,
1057+
probe.ref,
1058+
probe.ref,
1059+
TestProbe().ref,
1060+
schedulerId,
1061+
ack,
1062+
store,
1063+
getUserLimit,
1064+
checkToDropStaleActivation,
1065+
queueConfig),
1066+
queueManager.ref)
9921067

1068+
val now = Instant.now
1069+
fsm.underlyingActor.queue =
1070+
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message))
1071+
fsm.underlyingActor.containers = Set(testContainerId)
1072+
fsm.setState(Running, RunningData(probe.ref, probe.ref))
1073+
fsm ! StopSchedulingAsOutdated // update action
1074+
queueManager.expectMsg(staleQueueRemovedMsg)
1075+
(fsm ? GetActivation(tid, fqn, testContainerId, false, None))
1076+
.mapTo[GetActivationResponse]
1077+
.futureValue shouldBe GetActivationResponse(Right(message))
1078+
fsm.stop()
9931079
}
9941080

9951081
it should "stop scheduling if the namespace does not exist" in {
@@ -1343,7 +1429,9 @@ class MemoryQueueTests
13431429

13441430
Thread.sleep(1000)
13451431
memoryQueue.containers.size shouldBe 1
1346-
memoryQueue.creationIds.size shouldBe 1
1432+
// the monit actor in memoryQueue may decide to create a container
1433+
memoryQueue.creationIds.size should be >= 1
1434+
memoryQueue.creationIds.size should be <= 2
13471435
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 2
13481436
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 2
13491437

@@ -1380,7 +1468,8 @@ class MemoryQueueTests
13801468

13811469
Thread.sleep(1000)
13821470
memoryQueue.containers.size shouldBe 2
1383-
memoryQueue.creationIds.size shouldBe 2
1471+
memoryQueue.creationIds.size should be >= 2
1472+
memoryQueue.creationIds.size should be <= 3
13841473
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
13851474
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 4
13861475

@@ -1407,7 +1496,8 @@ class MemoryQueueTests
14071496

14081497
Thread.sleep(1000)
14091498
memoryQueue.containers.size shouldBe 2
1410-
memoryQueue.creationIds.size shouldBe 0
1499+
memoryQueue.creationIds.size should be >= 0
1500+
memoryQueue.creationIds.size should be <= 1
14111501
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
14121502
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
14131503

@@ -1452,11 +1542,13 @@ class MemoryQueueTests
14521542
Some(ContainerId("test-containerId4"))),
14531543
"test-value")
14541544

1455-
memoryQueue.creationIds.size shouldBe 0
1545+
memoryQueue.creationIds.size should be >= 0
1546+
memoryQueue.creationIds.size should be <= 1
14561547

14571548
Thread.sleep(1000)
14581549
memoryQueue.containers.size shouldBe 0
1459-
memoryQueue.creationIds.size shouldBe 1 //if there is no container, the queue tries to create one container
1550+
memoryQueue.creationIds.size should be >= 1 // if there is no container, the queue tries to create one container
1551+
memoryQueue.creationIds.size should be <= 2
14601552
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
14611553
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0
14621554
}

0 commit comments

Comments
 (0)