-
Notifications
You must be signed in to change notification settings - Fork 1.2k
send old version memoryQueue's stale activation to queueManager when update action #5228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d521906
65e3254
2aa5d26
7715ad5
b347f41
acdce96
eed0119
691497c
f7f8fff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,12 +10,7 @@ import org.apache.openwhisk.core.connector.ContainerCreationMessage | |
import org.apache.openwhisk.core.entity._ | ||
import org.apache.openwhisk.core.etcd.EtcdClient | ||
import org.apache.openwhisk.core.scheduler.grpc.ActivationResponse | ||
import org.apache.openwhisk.core.scheduler.message.{ | ||
ContainerCreation, | ||
ContainerDeletion, | ||
FailedCreationJob, | ||
SuccessfulCreationJob | ||
} | ||
import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob} | ||
import org.apache.openwhisk.core.scheduler.queue.MemoryQueue.checkToDropStaleActivation | ||
import org.apache.openwhisk.core.scheduler.queue._ | ||
import org.apache.openwhisk.core.service._ | ||
|
@@ -27,6 +22,8 @@ import org.scalatest.concurrent.ScalaFutures | |
import org.scalatest.junit.JUnitRunner | ||
import spray.json.{JsObject, JsString} | ||
|
||
import java.time.Instant | ||
import scala.collection.immutable.Queue | ||
import scala.concurrent.Future | ||
import scala.concurrent.duration.DurationInt | ||
import scala.language.postfixOps | ||
|
@@ -1399,6 +1396,7 @@ class MemoryQueueFlowTests | |
|
||
// this is to guarantee StopScheduling is handled in all states | ||
it should "handle StopScheduling in any states." in { | ||
val testContainerId = "fakeContainerId" | ||
val allStates = | ||
List(Running, Idle, ActionThrottled, NamespaceThrottled, Flushing, Removing, Removed) | ||
|
||
|
@@ -1461,6 +1459,7 @@ class MemoryQueueFlowTests | |
fsm.setState(state, FlushingData(schedulingActors.ref, schedulingActors.ref, WhiskError, "whisk error")) | ||
|
||
case Removing => | ||
fsm.underlyingActor.containers = Set(testContainerId) | ||
fsm ! message | ||
fsm.setState(state, RemovingData(schedulingActors.ref, schedulingActors.ref, outdated = true)) | ||
|
||
|
@@ -1491,15 +1490,18 @@ class MemoryQueueFlowTests | |
fsm ! StopSchedulingAsOutdated | ||
|
||
state match { | ||
// queue will be gracefully shutdown. | ||
case Removing => | ||
// queue should not be terminated as there is an activation | ||
Thread.sleep(gracefulShutdownTimeout.toMillis) | ||
|
||
// still exist old container for old queue, fetch the queue by old container | ||
container.send(fsm, getActivation()) | ||
container.expectMsg(ActivationResponse(Right(message))) | ||
|
||
// queue should not be terminated as there is an activation | ||
// has no old containers for old queue, so send the message to queueManager | ||
fsm.underlyingActor.containers = Set.empty[String] | ||
fsm.underlyingActor.queue = | ||
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(Instant.now.toEpochMilli + 1000), message)) | ||
fsm ! StopSchedulingAsOutdated | ||
parent.expectMsg(message) | ||
|
||
// queue should be terminated after gracefulShutdownTimeout | ||
Thread.sleep(gracefulShutdownTimeout.toMillis) | ||
|
||
// clean up actors only because etcd data is being used by a new queue | ||
|
@@ -1534,20 +1536,13 @@ class MemoryQueueFlowTests | |
probe.expectTerminated(fsm, 10.seconds) | ||
|
||
case _ => | ||
// queue is stale and will be removed | ||
parent.expectMsg(staleQueueRemovedMsg) | ||
parent.expectMsg(message) | ||
// queue is stale and will be removed | ||
probe.expectMsg(Transition(fsm, state, Removing)) | ||
|
||
fsm ! QueueRemovedCompleted | ||
|
||
// queue should not be terminated as there is an activation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why these lines are removed? since there is a container in https://github.com/apache/openwhisk/pull/5228/files#diff-4b5788222d7949baf4e09b570e4fa49d3fa79e5432ff50c798b04353758c8d4aR1539, I think message should not be sent to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, fixed. |
||
Thread.sleep(gracefulShutdownTimeout.toMillis) | ||
|
||
container.send(fsm, getActivation()) | ||
container.expectMsg(ActivationResponse(Right(message))) | ||
|
||
Thread.sleep(gracefulShutdownTimeout.toMillis) | ||
|
||
watcher.expectMsgAllOf( | ||
UnwatchEndpoint(inProgressContainerKey, isPrefix = true, watcherName), | ||
UnwatchEndpoint(existingContainerKey, isPrefix = true, watcherName), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -989,7 +989,93 @@ class MemoryQueueTests | |
parent.expectMsg(Transition(fsm, Flushing, Running)) | ||
probe.expectNoMessage(2.seconds) | ||
fsm.stop() | ||
} | ||
|
||
it should "send old version activation to queueManager when update action if doesn't exist old version container" in { | ||
val mockEtcdClient = mock[EtcdClient] | ||
val probe = TestProbe() | ||
val queueManager = TestProbe() | ||
|
||
expectDurationChecking(mockEsClient, testInvocationNamespace) | ||
|
||
val fsm = | ||
TestFSMRef( | ||
new MemoryQueue( | ||
mockEtcdClient, | ||
durationChecker, | ||
fqn, | ||
mockMessaging(), | ||
config, | ||
testInvocationNamespace, | ||
revision, | ||
endpoints, | ||
actionMetadata, | ||
probe.ref, | ||
probe.ref, | ||
probe.ref, | ||
TestProbe().ref, | ||
schedulerId, | ||
ack, | ||
store, | ||
getUserLimit, | ||
checkToDropStaleActivation, | ||
queueConfig), | ||
queueManager.ref) | ||
|
||
val now = Instant.now | ||
fsm.underlyingActor.queue = | ||
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message)) | ||
fsm.underlyingActor.containers = Set.empty[String] | ||
fsm.setState(Running, RunningData(probe.ref, probe.ref)) | ||
fsm ! StopSchedulingAsOutdated // update action | ||
queueManager.expectMsg(staleQueueRemovedMsg) | ||
queueManager.expectMsg(message) | ||
fsm.stop() | ||
} | ||
|
||
it should "fetch old version activation by old container when update action" in { | ||
val mockEtcdClient = mock[EtcdClient] | ||
val probe = TestProbe() | ||
val queueManager = TestProbe() | ||
val tid = TransactionId(TransactionId.generateTid()) | ||
|
||
expectDurationChecking(mockEsClient, testInvocationNamespace) | ||
|
||
val fsm = | ||
TestFSMRef( | ||
new MemoryQueue( | ||
mockEtcdClient, | ||
durationChecker, | ||
fqn, | ||
mockMessaging(), | ||
config, | ||
testInvocationNamespace, | ||
revision, | ||
endpoints, | ||
actionMetadata, | ||
probe.ref, | ||
probe.ref, | ||
probe.ref, | ||
TestProbe().ref, | ||
schedulerId, | ||
ack, | ||
store, | ||
getUserLimit, | ||
checkToDropStaleActivation, | ||
queueConfig), | ||
queueManager.ref) | ||
|
||
val now = Instant.now | ||
fsm.underlyingActor.queue = | ||
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message)) | ||
fsm.underlyingActor.containers = Set(testContainerId) | ||
fsm.setState(Running, RunningData(probe.ref, probe.ref)) | ||
fsm ! StopSchedulingAsOutdated // update action | ||
queueManager.expectMsg(staleQueueRemovedMsg) | ||
(fsm ? GetActivation(tid, fqn, testContainerId, false, None)) | ||
.mapTo[GetActivationResponse] | ||
.futureValue shouldBe GetActivationResponse(Right(message)) | ||
fsm.stop() | ||
} | ||
|
||
it should "stop scheduling if the namespace does not exist" in { | ||
|
@@ -1343,7 +1429,9 @@ class MemoryQueueTests | |
|
||
Thread.sleep(1000) | ||
memoryQueue.containers.size shouldBe 1 | ||
memoryQueue.creationIds.size shouldBe 1 | ||
// the monit actor in memoryQueue may decide to create a container | ||
memoryQueue.creationIds.size should be >= 1 | ||
memoryQueue.creationIds.size should be <= 2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Due to this codes: https://github.com/apache/openwhisk/blob/master/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala#L908 This test case may not stable, e.g. due to the So here, make it stable. |
||
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 2 | ||
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 2 | ||
|
||
|
@@ -1380,7 +1468,8 @@ class MemoryQueueTests | |
|
||
Thread.sleep(1000) | ||
memoryQueue.containers.size shouldBe 2 | ||
memoryQueue.creationIds.size shouldBe 2 | ||
memoryQueue.creationIds.size should be >= 2 | ||
memoryQueue.creationIds.size should be <= 3 | ||
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4 | ||
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 4 | ||
|
||
|
@@ -1407,7 +1496,8 @@ class MemoryQueueTests | |
|
||
Thread.sleep(1000) | ||
memoryQueue.containers.size shouldBe 2 | ||
memoryQueue.creationIds.size shouldBe 0 | ||
memoryQueue.creationIds.size should be >= 0 | ||
memoryQueue.creationIds.size should be <= 1 | ||
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0 | ||
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4 | ||
|
||
|
@@ -1452,11 +1542,13 @@ class MemoryQueueTests | |
Some(ContainerId("test-containerId4"))), | ||
"test-value") | ||
|
||
memoryQueue.creationIds.size shouldBe 0 | ||
memoryQueue.creationIds.size should be >= 0 | ||
memoryQueue.creationIds.size should be <= 1 | ||
|
||
Thread.sleep(1000) | ||
memoryQueue.containers.size shouldBe 0 | ||
memoryQueue.creationIds.size shouldBe 1 //if there is no container, the queue tries to create one container | ||
memoryQueue.creationIds.size should be >= 1 // if there is no container, the queue tries to create one container | ||
memoryQueue.creationIds.size should be <= 2 | ||
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0 | ||
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0 | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it easy to debug in future maybe.