Skip to content

send old version memoryQueue's stale activation to queueManager when update action #5228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ scheduler:
extraEnv: "{{ scheduler_extraEnv | default({}) }}"
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
inProgressJobRetentionSecond: "{{ scheduler_inProgressJobRetentionSecond | default('20 seconds') }}"
inProgressJobRetention: "{{ scheduler_inProgressJobRetention | default('20 seconds') }}"
managedFraction: "{{ scheduler_managed_fraction | default(1.0 - (scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
blackboxFraction: "{{ scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction) }}"
queueManager:
Expand Down
2 changes: 1 addition & 1 deletion ansible/roles/schedulers/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
"CONFIG_whisk_scheduler_protocol": "{{ scheduler.protocol }}"
"CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
"CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ scheduler.dataManagementService.retryInterval }}"
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetentionSecond }}"
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetention }}"
"CONFIG_whisk_scheduler_queueManager_maxSchedulingTime": "{{ scheduler.queueManager.maxSchedulingTime }}"
"CONFIG_whisk_scheduler_queueManager_maxRetriesToGetQueue": "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
"CONFIG_whisk_scheduler_queue_idleGrace": "{{ scheduler.queue.idleGrace }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ object ConfigKeys {
val schedulerMaxPeek = "whisk.scheduler.max-peek"
val schedulerQueue = "whisk.scheduler.queue"
val schedulerQueueManager = "whisk.scheduler.queue-manager"
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
val schedulerInProgressJobRetention = "whisk.scheduler.in-progress-job-retention"

val whiskClusterName = "whisk.cluster.name"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
extends Actor {
private implicit val ec: ExecutionContext = actorSystem.dispatcher
private val baseTimeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond)
private val baseTimeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetention)
private val retryLimit = 5
private val retryDelayTime = 100.milliseconds

/**
* Store a JobEntry in local to get an alarm for key timeout
Expand Down Expand Up @@ -83,7 +82,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
error,
reason)) =>
if (error.isEmpty) {
logging.info(this, s"[$creationId] create container successfully")
logging.info(this, s"[$action] [$creationId] create container successfully")
deleteJob(
invocationNamespace,
action,
Expand All @@ -97,7 +96,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
if (retryCount >= retryLimit || !error.exists(ContainerCreationError.whiskErrors.contains)) {
logging.error(
this,
s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation")
s"[$action] [$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it easy to debug in future maybe.

// Delete from pool after all retries are failed
deleteJob(
invocationNamespace,
Expand All @@ -109,8 +108,9 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
// Reschedule
logging.error(
this,
s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Started rescheduling")
// Add some time interval during retry create container, because etcd put operation needs some time if data inconsistant happens
s"[$action] [$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Started rescheduling")
// Add some exponential delay time interval during retry create container, because etcd put operation needs some time if data inconsistant happens
val retryDelayTime = (scala.math.pow(2, retryCount) * 100).milliseconds
actorSystem.scheduler.scheduleOnce(retryDelayTime) {
context.parent ! ReschedulingCreationJob(
tid,
Expand Down Expand Up @@ -183,16 +183,14 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
s"Failed to create a container for $action(blackbox: $isBlackbox), error: $creationId timed out after $timeout")
creationJobPool
.remove(creationId)
.foreach(
_ =>
sendState(
FailedCreationJob(
creationId,
invocationNamespace,
action,
revision,
ContainerCreationError.TimeoutError,
s"timeout waiting for the ack of $creationId after $timeout")))
.foreach(_ =>
sendState(FailedCreationJob(
creationId,
invocationNamespace,
action,
revision,
ContainerCreationError.TimeoutError,
s"[$action] timeout waiting for the ack of $creationId after $timeout")))
dataManagementService ! UnregisterData(
inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
logging.info(
this,
s"[$invocationNamespace:$action:$stateName] The queue received StopSchedulingAsOutdated trying to stop the queue.")

handleStaleActivationsWhenActionUpdated(context.parent)

cleanUpActorsAndGotoRemovedIfPossible(data.copy(outdated = true))
}

Expand Down Expand Up @@ -561,6 +564,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// let QueueManager know this queue is no longer in charge.
context.parent ! staleQueueRemovedMsg

handleStaleActivationsWhenActionUpdated(context.parent)

goto(Removing) using getRemovingData(data, outdated = true)

case Event(t: FailureMessage, _) =>
Expand Down Expand Up @@ -829,6 +834,23 @@ class MemoryQueue(private val etcdClient: EtcdClient,
}
}


private def handleStaleActivationsWhenActionUpdated(queueManager: ActorRef): Unit = {
if (queue.size > 0) {
// if doesn't exist old container to pull old memoryQueue's activation, send the old activations to queueManager
if (containers.size == 0) {
logging.warn(
this,
s"[$invocationNamespace:$action:$stateName] does not exist old version container to fetch the old version activation")
forwardAllActivations(queueManager)
} else {
logging.info(
this,
s"[$invocationNamespace:$action:$stateName] old version activation would be fetched by old version container")
}
}
}

private def completeAllActivations(reason: String, isWhiskError: Boolean): Unit = {
while (queue.nonEmpty) {
val (TimeSeriesActivationEntry(_, msg), newQueue) = queue.dequeue
Expand Down
2 changes: 1 addition & 1 deletion tests/src/test/resources/application.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ whisk {
max-retries-to-get-queue = "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
}
max-peek = "{{ scheduler.maxPeek }}"
in-progress-job-retention = "{{ scheduler.inProgressJobRetentionSecond | default('20 seconds') }}"
in-progress-job-retention = "{{ scheduler.inProgressJobRetention | default('20 seconds') }}"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class CreationJobManagerTests
with BeforeAndAfterEach
with StreamLogging {

private val timeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond)
private val timeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetention)
val blackboxTimeout = FiniteDuration(timeout.toSeconds * 3, TimeUnit.SECONDS)
implicit val ece: ExecutionContextExecutor = system.dispatcher
val config = new WhiskConfig(ExecManifest.requiredProperties)
Expand Down Expand Up @@ -322,7 +322,7 @@ class CreationJobManagerTests
registerMessage.msg.action,
registerMessage.msg.revision,
ContainerCreationError.TimeoutError,
s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $timeout"))
s"[${registerMessage.msg.action}] timeout waiting for the ack of ${registerMessage.msg.creationId} after $timeout"))
}

it should "increase the timeout if an action is a blackbox action" in {
Expand Down Expand Up @@ -373,7 +373,7 @@ class CreationJobManagerTests
registerMessage.msg.action,
registerMessage.msg.revision,
ContainerCreationError.TimeoutError,
s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $blackboxTimeout"))
s"[${registerMessage.msg.action}] timeout waiting for the ack of ${registerMessage.msg.creationId} after $blackboxTimeout"))
}

it should "delete a creation job with too many retry and send a FailedCreationJob to a queue" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@ import org.apache.openwhisk.core.connector.ContainerCreationMessage
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.scheduler.grpc.ActivationResponse
import org.apache.openwhisk.core.scheduler.message.{
ContainerCreation,
ContainerDeletion,
FailedCreationJob,
SuccessfulCreationJob
}
import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
import org.apache.openwhisk.core.scheduler.queue.MemoryQueue.checkToDropStaleActivation
import org.apache.openwhisk.core.scheduler.queue._
import org.apache.openwhisk.core.service._
Expand All @@ -27,6 +22,8 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import spray.json.{JsObject, JsString}

import java.time.Instant
import scala.collection.immutable.Queue
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
Expand Down Expand Up @@ -1399,6 +1396,7 @@ class MemoryQueueFlowTests

// this is to guarantee StopScheduling is handled in all states
it should "handle StopScheduling in any states." in {
val testContainerId = "fakeContainerId"
val allStates =
List(Running, Idle, ActionThrottled, NamespaceThrottled, Flushing, Removing, Removed)

Expand Down Expand Up @@ -1461,6 +1459,7 @@ class MemoryQueueFlowTests
fsm.setState(state, FlushingData(schedulingActors.ref, schedulingActors.ref, WhiskError, "whisk error"))

case Removing =>
fsm.underlyingActor.containers = Set(testContainerId)
fsm ! message
fsm.setState(state, RemovingData(schedulingActors.ref, schedulingActors.ref, outdated = true))

Expand Down Expand Up @@ -1491,15 +1490,18 @@ class MemoryQueueFlowTests
fsm ! StopSchedulingAsOutdated

state match {
// queue will be gracefully shutdown.
case Removing =>
// queue should not be terminated as there is an activation
Thread.sleep(gracefulShutdownTimeout.toMillis)

// still exist old container for old queue, fetch the queue by old container
container.send(fsm, getActivation())
container.expectMsg(ActivationResponse(Right(message)))

// queue should not be terminated as there is an activation
// has no old containers for old queue, so send the message to queueManager
fsm.underlyingActor.containers = Set.empty[String]
fsm.underlyingActor.queue =
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(Instant.now.toEpochMilli + 1000), message))
fsm ! StopSchedulingAsOutdated
parent.expectMsg(message)

// queue should be terminated after gracefulShutdownTimeout
Thread.sleep(gracefulShutdownTimeout.toMillis)

// clean up actors only because etcd data is being used by a new queue
Expand Down Expand Up @@ -1534,20 +1536,13 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)

case _ =>
// queue is stale and will be removed
parent.expectMsg(staleQueueRemovedMsg)
parent.expectMsg(message)
// queue is stale and will be removed
probe.expectMsg(Transition(fsm, state, Removing))

fsm ! QueueRemovedCompleted

// queue should not be terminated as there is an activation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why these lines are removed? since there is a container in https://github.com/apache/openwhisk/pull/5228/files#diff-4b5788222d7949baf4e09b570e4fa49d3fa79e5432ff50c798b04353758c8d4aR1539, I think message should not be sent to QueueManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, fixed.

Thread.sleep(gracefulShutdownTimeout.toMillis)

container.send(fsm, getActivation())
container.expectMsg(ActivationResponse(Right(message)))

Thread.sleep(gracefulShutdownTimeout.toMillis)

watcher.expectMsgAllOf(
UnwatchEndpoint(inProgressContainerKey, isPrefix = true, watcherName),
UnwatchEndpoint(existingContainerKey, isPrefix = true, watcherName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,93 @@ class MemoryQueueTests
parent.expectMsg(Transition(fsm, Flushing, Running))
probe.expectNoMessage(2.seconds)
fsm.stop()
}

it should "send old version activation to queueManager when update action if doesn't exist old version container" in {
val mockEtcdClient = mock[EtcdClient]
val probe = TestProbe()
val queueManager = TestProbe()

expectDurationChecking(mockEsClient, testInvocationNamespace)

val fsm =
TestFSMRef(
new MemoryQueue(
mockEtcdClient,
durationChecker,
fqn,
mockMessaging(),
config,
testInvocationNamespace,
revision,
endpoints,
actionMetadata,
probe.ref,
probe.ref,
probe.ref,
TestProbe().ref,
schedulerId,
ack,
store,
getUserLimit,
checkToDropStaleActivation,
queueConfig),
queueManager.ref)

val now = Instant.now
fsm.underlyingActor.queue =
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message))
fsm.underlyingActor.containers = Set.empty[String]
fsm.setState(Running, RunningData(probe.ref, probe.ref))
fsm ! StopSchedulingAsOutdated // update action
queueManager.expectMsg(staleQueueRemovedMsg)
queueManager.expectMsg(message)
fsm.stop()
}

it should "fetch old version activation by old container when update action" in {
val mockEtcdClient = mock[EtcdClient]
val probe = TestProbe()
val queueManager = TestProbe()
val tid = TransactionId(TransactionId.generateTid())

expectDurationChecking(mockEsClient, testInvocationNamespace)

val fsm =
TestFSMRef(
new MemoryQueue(
mockEtcdClient,
durationChecker,
fqn,
mockMessaging(),
config,
testInvocationNamespace,
revision,
endpoints,
actionMetadata,
probe.ref,
probe.ref,
probe.ref,
TestProbe().ref,
schedulerId,
ack,
store,
getUserLimit,
checkToDropStaleActivation,
queueConfig),
queueManager.ref)

val now = Instant.now
fsm.underlyingActor.queue =
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message))
fsm.underlyingActor.containers = Set(testContainerId)
fsm.setState(Running, RunningData(probe.ref, probe.ref))
fsm ! StopSchedulingAsOutdated // update action
queueManager.expectMsg(staleQueueRemovedMsg)
(fsm ? GetActivation(tid, fqn, testContainerId, false, None))
.mapTo[GetActivationResponse]
.futureValue shouldBe GetActivationResponse(Right(message))
fsm.stop()
}

it should "stop scheduling if the namespace does not exist" in {
Expand Down Expand Up @@ -1343,7 +1429,9 @@ class MemoryQueueTests

Thread.sleep(1000)
memoryQueue.containers.size shouldBe 1
memoryQueue.creationIds.size shouldBe 1
// the monit actor in memoryQueue may decide to create a container
memoryQueue.creationIds.size should be >= 1
memoryQueue.creationIds.size should be <= 2
Copy link
Contributor Author

@ningyougang ningyougang May 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to this codes: https://github.com/apache/openwhisk/blob/master/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala#L908
Above #L908 code is good due to store the inprogressId to etcd needs some time, so when containerCreationMessage created, need to add that creationId to creationIds in advance.

This test case may not stable, e.g. due to the creationId(testId1) doesn't keep consistent with CreationId.generate(), if the monit actor decide to create a container very quickly, this test case would be failed.

So here, make it stable.

memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 2
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 2

Expand Down Expand Up @@ -1380,7 +1468,8 @@ class MemoryQueueTests

Thread.sleep(1000)
memoryQueue.containers.size shouldBe 2
memoryQueue.creationIds.size shouldBe 2
memoryQueue.creationIds.size should be >= 2
memoryQueue.creationIds.size should be <= 3
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 4

Expand All @@ -1407,7 +1496,8 @@ class MemoryQueueTests

Thread.sleep(1000)
memoryQueue.containers.size shouldBe 2
memoryQueue.creationIds.size shouldBe 0
memoryQueue.creationIds.size should be >= 0
memoryQueue.creationIds.size should be <= 1
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4

Expand Down Expand Up @@ -1452,11 +1542,13 @@ class MemoryQueueTests
Some(ContainerId("test-containerId4"))),
"test-value")

memoryQueue.creationIds.size shouldBe 0
memoryQueue.creationIds.size should be >= 0
memoryQueue.creationIds.size should be <= 1

Thread.sleep(1000)
memoryQueue.containers.size shouldBe 0
memoryQueue.creationIds.size shouldBe 1 //if there is no container, the queue tries to create one container
memoryQueue.creationIds.size should be >= 1 // if there is no container, the queue tries to create one container
memoryQueue.creationIds.size should be <= 2
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0
}
Expand Down
Loading