diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf index dfde945a003..1eb4eb5805c 100644 --- a/core/controller/src/main/resources/reference.conf +++ b/core/controller/src/main/resources/reference.conf @@ -31,7 +31,7 @@ whisk { timeout-addon = 1m fpc { - use-perMin-throttles = false + use-per-min-throttles = false } } controller { diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala index bd72c8cb629..576d3b3b1a6 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.openwhisk.core.loadBalancer import java.nio.charset.StandardCharsets diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala index f36f1f82f53..0bbc1d98bb0 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.openwhisk.core.scheduler.queue import akka.actor.{Actor, ActorSystem, Props} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala new file mode 100644 index 00000000000..06724e5f0f1 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.controller.test + +import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.controller.RejectRequest +import org.apache.openwhisk.core.entitlement.{EntitlementProvider, FPCEntitlementProvider, Privilege, Resource} +import org.apache.openwhisk.core.entitlement.Privilege.{ACTIVATE, DELETE, PUT, READ, REJECT} +import org.apache.openwhisk.core.entity.{EntityName, EntityPath, FullyQualifiedEntityName} +import org.apache.openwhisk.core.loadBalancer.LoadBalancer +import org.junit.runner.RunWith +import org.scalamock.scalatest.MockFactory +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class FPCEntitlementProviderTests extends ControllerTestCommon with ScalaFutures with MockFactory { + + implicit val transactionId = TransactionId.testing + + it should "get throttle flag from loadBalancer" in { + val someUser = WhiskAuthHelpers.newIdentity() + val action = FullyQualifiedEntityName(EntityPath("testns"), EntityName("action")) + val loadBalancer = mock[LoadBalancer] + (loadBalancer + .checkThrottle(_: EntityPath, _: String)) + .expects(someUser.namespace.name.toPath, action.fullPath.asString) + .returning(true) + val resources = Set(Resource(action.path, ACTIONS, Some(action.name.name))) + + val entitlementProvider: EntitlementProvider = new FPCEntitlementProvider(whiskConfig, loadBalancer, instance) + entitlementProvider.checkThrottles(someUser, ACTIVATE, resources).failed.futureValue shouldBe a[RejectRequest] + + Seq[Privilege](READ, PUT, DELETE, REJECT).foreach(OP => { + noException shouldBe thrownBy(entitlementProvider.checkThrottles(someUser, OP, resources).futureValue) + }) + + val action2 = FullyQualifiedEntityName(EntityPath("testns2"), EntityName("action2")) + val resources2 = Set(Resource(action2.path, ACTIONS, Some(action2.name.name))) + (loadBalancer + .checkThrottle(_: EntityPath, _: String)) + .expects(someUser.namespace.name.toPath, action2.fullPath.asString) + .returning(false) + noException shouldBe thrownBy(entitlementProvider.checkThrottles(someUser, ACTIVATE, resources2).futureValue) + } +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala index 98342fc2255..81aa708b427 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.openwhisk.core.loadBalancer.test import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala index 6c17346da7d..a00b632c8d2 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala @@ -49,7 +49,7 @@ import org.apache.openwhisk.core.scheduler.message.{ SuccessfulCreationJob } import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool} -import org.apache.openwhisk.core.service.WatchEndpointInserted +import org.apache.openwhisk.core.service.{WatchEndpointInserted, WatchEndpointRemoved} import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.junit.runner.RunWith import org.scalamock.scalatest.MockFactory @@ -77,6 +77,7 @@ class ContainerManagerTests with StreamLogging { val config = new WhiskConfig(ExecManifest.requiredProperties) + ExecManifest.initialize(config) val testInvocationNamespace = "test-invocation-namespace" val testNamespace = "test-namespace" @@ -278,11 +279,16 @@ class ContainerManagerTests ) expectGetInvokers(mockEtcd, invokers) expectGetInvokers(mockEtcd, invokers) - expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` twice, and another one for warmup + expectGetInvokers(mockEtcd, invokers) + expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` for 3 times, and another one for warmup val mockJobManager = TestProbe() val mockWatcher = TestProbe() val receiver = TestProbe() + // ignore warmUp message + receiver.ignoreMsg { + case s: String => s.contains("warmUp") + } val manager = system.actorOf(ContainerManager @@ -345,11 +351,6 @@ class ContainerManagerTests // it should reuse 2 warmed containers manager ! ContainerCreation(msgs, 128.MB, testInvocationNamespace) - // ignore warmUp message - receiver.ignoreMsg { - case s: String => s.contains("warmUp") - } - // msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the healthy invoker receiver.expectMsg(s"invoker0-$msg1") receiver.expectMsg(s"invoker1-$msg2") @@ -371,10 +372,30 @@ class ContainerManagerTests manager ! ContainerCreation(List(msg2), 128.MB, testInvocationNamespace) receiver.expectMsg(s"invoker1-$msg2") - // warmed container for action1 become warmed - manager ! SuccessfulCreationJob(msg1.creationId, msg1.invocationNamespace, msg1.action, msg1.revision) + // warmed container for action1 become warmed when received FailedCreationJob + manager ! FailedCreationJob( + msg1.creationId, + msg1.invocationNamespace, + msg1.action, + msg1.revision, + NoAvailableResourceInvokersError, + "") manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace) receiver.expectMsg(s"invoker0-$msg1") + + // warmed container for action1 become unwarmed + manager ! WatchEndpointRemoved( + ContainerKeys.warmedPrefix, + ContainerKeys.warmedContainers( + testInvocationNamespace, + testfqn, + testRevision, + InvokerInstanceId(0, userMemory = 0.bytes), + ContainerId("fake")), + "", + true) + manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace) + receiver.expectMsg(s"invoker2-$msg1") } it should "not try warmed containers if revision is unmatched" in { @@ -392,6 +413,10 @@ class ContainerManagerTests val mockJobManager = TestProbe() val mockWatcher = TestProbe() val receiver = TestProbe() + // ignore warmUp message + receiver.ignoreMsg { + case s: String => s.contains("warmUp") + } val manager = system.actorOf(ContainerManager @@ -423,11 +448,6 @@ class ContainerManagerTests // it should not reuse the warmed container manager ! ContainerCreation(List(msg), 128.MB, testInvocationNamespace) - // ignore warmUp message - receiver.ignoreMsg { - case s: String => s.contains("warmUp") - } - // it should be scheduled to the sole health invoker: invoker2 receiver.expectMsg(s"invoker2-$msg") @@ -686,6 +706,52 @@ class ContainerManagerTests "No available invokers with resources List(fake).")) } + it should "choose tagged invokers when no invokers available which has no tags first" in { + val msg = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + testfqn.resolve(EntityName("ns1")), + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + val msg2 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + testfqn.resolve(EntityName("ns2")), + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + + val probe = TestProbe() + QueuePool.put( + MemoryQueueKey(testInvocationNamespace, testfqn.toDocId.asDocInfo(testRevision)), + MemoryQueueValue(probe.ref, true)) + + val healthyInvokers: List[InvokerHealth] = + List(InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq("cpu", "memory")), Healthy)) + + // there is no available invokers which has no tags, it should choose tagged invokers for msg + // and for msg2, it should return no available invokers + val pairs = + ContainerManager.schedule(healthyInvokers, List(msg, msg2), msg.whiskActionMetaData.limits.memory.megabytes.MB) + pairs should contain theSameElementsAs List(ScheduledPair(msg, healthyInvokers(0).id)) + + probe.expectMsg( + FailedCreationJob( + msg2.creationId, + testInvocationNamespace, + msg2.action, + testRevision, + NoAvailableInvokersError, + "No available invokers.")) + } + it should "respect the resource policy while use resource filter" in { val msg1 = ContainerCreationMessage( @@ -730,6 +796,21 @@ class ContainerManagerTests testsid, schedulerHost, rpcPort) + val msg4 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + testfqn.resolve(EntityName("ns3")), + testRevision, + actionMetadata.copy( + limits = action.limits.copy(memory = MemoryLimit(512.MB)), + annotations = + Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters( + Annotations.InvokerResourcesStrictPolicyAnnotationName, + JsBoolean(false))), + testsid, + schedulerHost, + rpcPort) val probe = TestProbe() QueuePool.put( @@ -738,7 +819,7 @@ class ContainerManagerTests val healthyInvokers: List[InvokerHealth] = List( InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq.empty[String]), Healthy), - InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB, tags = Seq("cpu", "memory")), Healthy)) + InvokerHealth(InvokerInstanceId(1, userMemory = 256.MB, tags = Seq("cpu", "memory")), Healthy)) // while resourcesStrictPolicy is true, and there is no suitable invokers, return an error val pairs = @@ -760,12 +841,20 @@ class ContainerManagerTests pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, healthyInvokers(0).id)) // while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first, - // if there is none, then choose other invokers, here is the invoker1 + // if there is none, then choose invokers with other tags, if there is still none, return no available invokers val pairs3 = ContainerManager.schedule( healthyInvokers.takeRight(1), - List(msg3), + List(msg3, msg4), msg3.whiskActionMetaData.limits.memory.megabytes.MB) pairs3 should contain theSameElementsAs List(ScheduledPair(msg3, healthyInvokers(1).id)) + probe.expectMsg( + FailedCreationJob( + msg4.creationId, + testInvocationNamespace, + msg4.action, + testRevision, + NoAvailableInvokersError, + "No available invokers.")) } it should "send FailedCreationJob to queue manager when no invokers are available" in { @@ -991,6 +1080,81 @@ class ContainerManagerTests result.take(m) shouldBe result.takeRight(b) } + + behavior of "warm up" + + it should "warm up all invokers when start" in { + val mockEtcd = mock[EtcdClient] + + val invokers: List[InvokerHealth] = List( + InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Healthy), + InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Healthy), + InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy), + ) + expectGetInvokers(mockEtcd, invokers) + + val mockJobManager = TestProbe() + val mockWatcher = TestProbe() + val receiver = TestProbe() + + val manager = + system.actorOf(ContainerManager + .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref)) + + (0 to 2).foreach(i => { + receiver.expectMsgPF() { + case msg: String if msg.contains("warmUp") && msg.contains(s"invoker$i") => true + case msg => false + } + }) + } + + it should "warm up new invoker when new one is registered" in { + val mockEtcd = mock[EtcdClient] + expectGetInvokers(mockEtcd, List.empty) + + val mockJobManager = TestProbe() + val mockWatcher = TestProbe() + val receiver = TestProbe() + + val manager = + system.actorOf(ContainerManager + .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref)) + + manager ! WatchEndpointInserted( + InvokerKeys.prefix, + InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)), + "", + true) + receiver.expectMsgPF() { + case msg: String if msg.contains("warmUp") && msg.contains(s"invoker0") => true + case _ => false + } + + // shouldn't warmup again + manager ! WatchEndpointInserted( + InvokerKeys.prefix, + InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)), + "", + true) + receiver.expectNoMessage() + + // should warmup again since invoker0 is a new one + manager ! WatchEndpointRemoved( + InvokerKeys.prefix, + InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)), + "", + true) + manager ! WatchEndpointInserted( + InvokerKeys.prefix, + InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)), + "", + true) + receiver.expectMsgPF() { + case msg: String if msg.contains("warmUp") && msg.contains(s"invoker0") => true + case _ => false + } + } } @RunWith(classOf[JUnitRunner]) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala index 7aa43b0314d..b2042f256c9 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala @@ -33,7 +33,7 @@ import org.apache.openwhisk.core.scheduler.queue.{ NoMemoryQueue, QueuePool } -import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse} +import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers} @@ -64,7 +64,7 @@ class ActivationServiceImplTests private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout) - implicit val timeoutConfig = PatienceConfig(5.seconds) + implicit val timeoutConfig = PatienceConfig(10.seconds) behavior of "ActivationService" @@ -156,6 +156,28 @@ class ActivationServiceImplTests expectNoMessage(200.millis) } + it should "return NoActivationMessage if queue doesn't return response" in { + + val activationServiceImpl = ActivationServiceImpl() + + QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(testActor, true)) + + val tid = TransactionId(TransactionId.generateTid()) + activationServiceImpl + .fetchActivation( + FetchRequest( + tid.serialize, + message.user.namespace.name.asString, + testFQN.serialize, + testDocRevision.serialize, + testContainerId, + false, + alive = true)) + .futureValue shouldBe FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize) + + expectMsg(GetActivation(tid, testFQN, testContainerId, false, None)) + } + it should "return NoActivationMessage if it is a warm-up action" in { val activationServiceImpl = ActivationServiceImpl() @@ -219,4 +241,33 @@ class ActivationServiceImplTests } } + it should "reschedule msg if related queue exist" in { + QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(testActor, true)) + val activationServiceImpl = ActivationServiceImpl() + + activationServiceImpl + .rescheduleActivation( + RescheduleRequest( + message.user.namespace.name.asString, + testFQN.serialize, + testDocRevision.serialize, + message.serialize)) + .futureValue shouldBe RescheduleResponse(true) + + expectMsg(message) + } + + it should "not reschedule msg if queue doesn't exist" in { + val activationServiceImpl = ActivationServiceImpl() + + activationServiceImpl + .rescheduleActivation( + RescheduleRequest( + message.user.namespace.name.asString, + testFQN.serialize, + testDocRevision.serialize, + message.serialize)) + .futureValue shouldBe RescheduleResponse() + } + } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala index e9e6694e046..7b6cbefbed7 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala @@ -42,6 +42,7 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer import org.apache.openwhisk.core.scheduler.queue.NamespaceContainerCount import org.apache.openwhisk.core.service.{DeleteEvent, PutEvent, UnwatchEndpoint, WatchEndpoint, WatcherService} +import org.apache.openwhisk.utils.retry import org.junit.runner.RunWith import org.scalamock.scalatest.MockFactory import org.scalatest.concurrent.ScalaFutures @@ -49,7 +50,7 @@ import org.scalatest.{FlatSpecLike, Matchers} import org.scalatest.junit.JUnitRunner import scala.concurrent.Future -import scala.concurrent.duration.TimeUnit +import scala.concurrent.duration._ @RunWith(classOf[JUnitRunner]) class ContainerCounterTests @@ -247,11 +248,42 @@ class ContainerCounterTests NamespaceContainerCount.instances.clear() } - class MockEtcdClient(client: Client, isLeader: Boolean, leaseNotFound: Boolean = false, failedCount: Int = 1) + it should "update the number of containers correctly when multiple entries are inserted into etcd" in { + val mockEtcdClient = new MockEtcdClient(client, true, failedCount = 1) + val watcher = system.actorOf(WatcherService.props(mockEtcdClient)) + + val ns = NamespaceContainerCount(namespace, mockEtcdClient, watcher) + retry(() => { + ns.inProgressContainerNumByNamespace shouldBe 0 + ns.existingContainerNumByNamespace shouldBe 0 + }, 10, Some(100.milliseconds)) + + val invoker = "invoker0" + (0 to 100).foreach(i => { + mockEtcdClient.publishEvents( + EventType.PUT, + inProgressContainer(namespace, fqn, revision, schedulerId, CreationId(s"testId$i")), + "test-value") + }) + (0 to 100).foreach(i => { + mockEtcdClient.publishEvents( + EventType.PUT, + s"${ContainerKeys.existingContainers(namespace, fqn, DocRevision.empty)}/${invoker}/test-container$i", + "test-value") + }) + + retry(() => { + ns.inProgressContainerNumByNamespace shouldBe 101 + ns.existingContainerNumByNamespace shouldBe 101 + }, 50, Some(100.milliseconds)) + } + + class MockEtcdClient(client: Client, isLeader: Boolean, leaseNotFound: Boolean = false, failedCount: Int = 0) extends EtcdClient(client)(ec) { var count = 0 var storedValues = List.empty[(String, String, Long, Long)] var dataMap = Map[String, String]() + var totalFailedCount = 0 override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: Long): Future[TxnResponse] = { if (isLeader) { @@ -264,7 +296,12 @@ class ContainerCounterTests * this method count the number of entries whose key starts with the given prefix */ override def getCount(prefixKey: String): Future[Long] = { - Future.successful { dataMap.count(data => data._1.startsWith(prefixKey)) } + if (totalFailedCount < failedCount) { + totalFailedCount += 1 + Future.failed(new Exception("error")) + } else { + Future.successful { dataMap.count(data => data._1.startsWith(prefixKey)) } + } } var watchCallbackMap = Map[String, WatchUpdate => Unit]() diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala index 735506d5bde..1d523c20d32 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.openwhisk.core.scheduler.queue.test import akka.actor.ActorRef @@ -10,7 +27,12 @@ import org.apache.openwhisk.core.connector.ContainerCreationMessage import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.etcd.EtcdClient import org.apache.openwhisk.core.scheduler.grpc.ActivationResponse -import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob} +import org.apache.openwhisk.core.scheduler.message.{ + ContainerCreation, + ContainerDeletion, + FailedCreationJob, + SuccessfulCreationJob +} import org.apache.openwhisk.core.scheduler.queue.MemoryQueue.checkToDropStaleActivation import org.apache.openwhisk.core.scheduler.queue._ import org.apache.openwhisk.core.service._ @@ -47,6 +69,7 @@ class MemoryQueueFlowTests override def afterEach(): Unit = { super.afterEach() logLines.foreach(println) + stream.reset() } behavior of "MemoryQueueFlow" @@ -331,9 +354,7 @@ class MemoryQueueFlowTests fsm.underlyingActor.queue.size shouldBe 0 }, 5.seconds) - Thread.sleep(flushGrace.toMillis) - - parent.expectMsg(queueRemovedMsg) + parent.expectMsg(flushGrace * 2 + 5.seconds, queueRemovedMsg) probe.expectMsg(Transition(fsm, Flushing, Removed)) fsm ! QueueRemovedCompleted diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala index 887e75a937d..c75a400fcbb 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.openwhisk.core.scheduler.queue.test import java.time.Instant diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala index ceaf3dd7b46..8c4ee849f5b 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.openwhisk.core.scheduler.queue.test import java.time.Instant diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala index 208702506e7..6ad1513f754 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala @@ -19,23 +19,23 @@ package org.apache.openwhisk.core.scheduler.queue.test import java.time.{Clock, Instant} import java.util.concurrent.atomic.AtomicInteger - import akka.actor.{Actor, ActorIdentity, ActorRef, ActorRefFactory, ActorSystem, Identify, Props} import akka.pattern.ask import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, TestProbe} import akka.util.Timeout -import com.ibm.etcd.api.RangeResponse +import com.ibm.etcd.api.{KeyValue, RangeResponse} import common.{LoggedFunction, StreamLogging} import org.apache.openwhisk.common.{GracefulShutdown, TransactionId} import org.apache.openwhisk.core.WarmUp.warmUpAction import org.apache.openwhisk.core.ack.ActiveAck import org.apache.openwhisk.core.connector.test.TestConnector -import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage} +import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, StatusData, StatusQuery} import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext} import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys -import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdLeader} +import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader} +import org.apache.openwhisk.core.etcd.EtcdType._ import org.apache.openwhisk.core.scheduler.grpc.test.CommonVariable import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse, GetActivation} import org.apache.openwhisk.core.scheduler.queue._ @@ -78,7 +78,7 @@ class QueueManagerTests val testQueueCreationMessage = CreateQueue(testInvocationNamespace, testFQN, testDocRevision, testActionMetaData) - val schedulerEndpoint = SchedulerEndpoints("127.0.0.1", 2552, 8080) + val schedulerEndpoint = SchedulerEndpoints("127.0.0.1", 8080, 2552) val mockConsumer = new TestConnector(s"scheduler${schedulerId.asString}", 4, true) val messageTransId = TransactionId(TransactionId.testing.meta.id) @@ -100,6 +100,12 @@ class QueueManagerTests ControllerInstanceId("0"), blocking = false, content = None) + val statusData = StatusData(testInvocationNamespace, testFQN.asString, 0, "Running", "RunningData") + + // update start time for activation to ensure it's not stale + def newActivation(start: Instant = Instant.now()): ActivationMessage = { + activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = start))) + } val activationResponse = ActivationResponse(Right(activationMessage)) @@ -125,7 +131,9 @@ class QueueManagerTests system.actorOf(Props(new Actor() { override def receive: Receive = { case GetActivation(_, _, _, _, _, _) => - sender ! ActivationResponse(Right(activationMessage)) + sender ! ActivationResponse(Right(newActivation())) + case StatusQuery => + sender ! statusData } })) @@ -220,6 +228,34 @@ class QueueManagerTests true) } + it should "response queue creation request when failed to do election" in { + val mockEtcdClient = mock[EtcdClient] + val dataManagementService = getTestDataManagementService(false) + val watcher = TestProbe() + + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + } + it should "not create a queue if there is already a queue for the given fqn" in { val mockEtcdClient = mock[EtcdClient] val dataManagementService = getTestDataManagementService() @@ -298,12 +334,16 @@ class QueueManagerTests probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true)) } - private def getTestDataManagementService() = { + private def getTestDataManagementService(success: Boolean = true) = { val dataManagementService = TestProbe() dataManagementService.setAutoPilot((sender: ActorRef, msg: Any) => msg match { case ElectLeader(key, value, _, _) => - sender ! ElectionResult(Right(EtcdLeader(key, value, 10))) + if (success) { + sender ! ElectionResult(Right(EtcdLeader(key, value, 10))) + } else { + sender ! ElectionResult(Left(EtcdFollower(key, value))) + } TestActor.KeepRunning case _ => @@ -312,6 +352,61 @@ class QueueManagerTests dataManagementService } + it should "forward msg to remote queue when queue exist on remote" in { + stream.reset() + val leaderKey = QueueKeys.queue( + activationMessage.user.namespace.name.asString, + activationMessage.action.copy(version = None), + true) + val mockEtcdClient = mock[EtcdClient] + (mockEtcdClient + .get(_: String)) + .expects(*) + .returning( + Future.successful( + RangeResponse + .newBuilder() + .addKvs(KeyValue.newBuilder().setKey(leaderKey).setValue(schedulerEndpoint.serialize).build()) + .build())) + .once() + val dataManagementService = getTestDataManagementService() + val watcher = TestProbe() + + val probe = TestProbe() + + val childFactory = + (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref + + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + watcher.expectMsg(watchEndpoint) + + // got a message but no queue created on this scheduler + // it should try to got leader key from etcd and forward this msg to remote queue, here is `schedulerEndpoints` + queueManager ! newActivation() + stream.toString should include(s"send activation to remote queue, key: $leaderKey") + stream.toString should include(s"add a new actor selection to a map with key: $leaderKey") + stream.reset() + + // got msg again, and it should get remote queue from memory instead of etcd + val msg2 = newActivation().copy(activationId = ActivationId.generate()) + queueManager ! msg2 + stream.toString shouldNot include(s"send activation to remote queue, key: $leaderKey") + } + it should "create a new MemoryQueue when the revision matches with the one in a datastore" in { val mockEtcdClient = mock[EtcdClient] val dataManagementService = getTestDataManagementService() @@ -367,10 +462,14 @@ class QueueManagerTests content = None) queueManager ! activationMessage - queueManager ! activationMessage.copy(activationId = ActivationId.generate()) // even send two requests, we should only create one queue + val msgs = (0 to 10).map(i => { + activationMessage.copy(activationId = ActivationId.generate()) + }) + msgs.foreach(msg => queueManager ! msg) // even send multiple requests, we should only create new queue for once probe.expectMsg(StopSchedulingAsOutdated) probe.expectMsg(VersionUpdated) probe.expectMsg(activationMessage) + msgs.foreach(msg => probe.expectMsg(msg)) } it should "create a new MemoryQueue correctly when the action is updated again during updating the queue" in { @@ -453,6 +552,15 @@ class QueueManagerTests it should "not skip outdated activation when the revision is older than the one in a datastore" in { stream.reset() val mockEtcdClient = mock[EtcdClient] + (mockEtcdClient + .get(_: String)) + .expects(*) + .returning( + Future.successful( + RangeResponse + .newBuilder() + .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build()) + .build())) val dataManagementService = getTestDataManagementService() val watcher = TestProbe() @@ -491,7 +599,7 @@ class QueueManagerTests true) //the activationMessage's revision(1-test-revision) is older than current queue's revision(2-test-revision) - queueManager ! activationMessage + queueManager ! newActivation() stream.toString should include(s"it will be replaced with the latest revision and invoked") } @@ -522,7 +630,7 @@ class QueueManagerTests mockConsumer, QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds))) - queueManager ! activationMessage + queueManager ! newActivation() Thread.sleep(100) (mockEtcdClient.get _) verify (*) repeated (3) } @@ -554,11 +662,84 @@ class QueueManagerTests mockConsumer, QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds))) - queueManager ! activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = Instant.now()))) + queueManager ! newActivation() Thread.sleep(100) (mockEtcdClient.get _) verify (*) repeated (3) } + it should "save queue endpoint in memory" in { + stream.reset() + + val mockEtcdClient = stub[EtcdClient] + val dataManagementService = getTestDataManagementService() + dataManagementService.ignoreMsg { + case _: UpdateDataOnChange => true + } + val watcher = TestProbe() + + val emptyResult = Future.successful(RangeResponse.newBuilder().build()) + (mockEtcdClient.get _) when (*) returns (emptyResult) + + val queueManager = + TestActorRef( + new QueueManager( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer, + QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds))) + + queueManager ! WatchEndpointInserted("queue", "queue/test-action/leader", schedulerEndpoint.serialize, true) + stream.toString should include(s"Endpoint inserted, key: queue/test-action/leader, endpoints: ${schedulerEndpoint}") + stream.reset() + + queueManager ! WatchEndpointInserted("queue", "queue/test-action/leader", "host with wrong format", true) + stream.toString should include(s"Unexpected error") + stream.toString should include(s"when put leaderKey: queue/test-action/leader") + stream.reset() + + queueManager ! WatchEndpointRemoved("queue", "queue/test-action/leader", schedulerEndpoint.serialize, true) + stream.toString should include(s"Endpoint removed for key: queue/test-action/leader") + } + + it should "able to query queue status" in { + val mockEtcdClient = mock[EtcdClient] + val watcher = TestProbe() + val dataManagementService = getTestDataManagementService() + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1 + + (queueManager ? StatusQuery).mapTo[Future[Iterable[StatusData]]].futureValue.futureValue shouldBe List(statusData) + } + it should "drop the activation message that has not been scheduled for a long time" in { val mockEtcdClient = mock[EtcdClient] val watcher = TestProbe() @@ -577,7 +758,7 @@ class QueueManagerTests } val oldNow = Instant.now(Clock.systemUTC()).minusMillis(11000) - val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow))) + val oldActivationMessage = newActivation(oldNow) val queueManager = TestActorRef( @@ -606,6 +787,15 @@ class QueueManagerTests it should "not drop the unscheduled activation message that has been processed within the scheduling time limit." in { val mockEtcdClient = mock[EtcdClient] + (mockEtcdClient + .get(_: String)) + .expects(*) + .returning( + Future.successful( + RangeResponse + .newBuilder() + .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build()) + .build())) val watcher = TestProbe() val probe = TestProbe() val dataManagementService = getTestDataManagementService() @@ -622,7 +812,7 @@ class QueueManagerTests } val oldNow = Instant.now(Clock.systemUTC()).minusMillis(9000) - val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow))) + val oldActivationMessage = newActivation(oldNow) val queueManager = TestActorRef( @@ -692,7 +882,7 @@ class QueueManagerTests true) queueManager.tell( - UpdateMemoryQueue(testFQN.toDocId.asDocInfo(testDocRevision), newFqn, activationMessage), + UpdateMemoryQueue(testFQN.toDocId.asDocInfo(testDocRevision), newFqn, newActivation()), consumer.ref) probe.expectMsg(activationMessage.activationId) @@ -730,7 +920,7 @@ class QueueManagerTests testFQN.toDocId.asDocInfo(testDocRevision), Some(testLeaderKey)) - QueuePool.size shouldBe 0 + (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 0 } it should "put the queue back to pool if it receives a QueueReactive message" in { @@ -759,18 +949,18 @@ class QueueManagerTests testFQN, true) - QueuePool.size shouldBe 1 + (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1 queueManager ! QueueRemoved( testInvocationNamespace, testFQN.toDocId.asDocInfo(testDocRevision), Some(testLeaderKey)) - QueuePool.size shouldBe 0 + (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 0 queueManager ! QueueReactivated(testInvocationNamespace, testFQN, testFQN.toDocId.asDocInfo(testDocRevision)) - QueuePool.size shouldBe 1 + (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1 } it should "put pool information to data management service" in { @@ -899,7 +1089,7 @@ class QueueManagerTests system.actorOf(Props(new Actor() { override def receive: Receive = { case GetActivation(_, _, _, _, _, _) => - sender ! ActivationResponse(Right(activationMessage)) + sender ! ActivationResponse(Right(newActivation())) case GracefulShutdown => probe.ref ! GracefulShutdown diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala index 20771e6b3b4..83a4b6f8edd 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala @@ -1,15 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.openwhisk.core.scheduler.queue.test import java.util.concurrent.atomic.AtomicInteger - import akka.actor.ActorSystem import akka.testkit.{TestKit, TestProbe} import common.StreamLogging import org.apache.openwhisk.core.entity.{EntityName, EntityPath, FullyQualifiedEntityName, SemVer} import org.apache.openwhisk.core.scheduler.queue._ +import org.junit.runner.RunWith import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner import org.scalatest.{FlatSpecLike, Matchers} +@RunWith(classOf[JUnitRunner]) class SchedulingDecisionMakerTests extends TestKit(ActorSystem("SchedulingDecisionMakerTests")) with FlatSpecLike