diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf index 1f696fc43b7..f04d2888f86 100644 --- a/core/scheduler/src/main/resources/application.conf +++ b/core/scheduler/src/main/resources/application.conf @@ -79,6 +79,8 @@ whisk { stale-threshold = "100 milliseconds" check-interval = "100 milliseconds" drop-interval = "10 seconds" + allow-over-provision-before-throttle = false + namespace-over-provision-before-throttle-ratio = 1.5 } queue { idle-grace = "20 seconds" diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala index e3ed70d8a0e..f260feb4f37 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala @@ -421,4 +421,8 @@ object SchedulerStates extends DefaultJsonProtocol { def parse(states: String) = Try(serdes.read(states.parseJson)) } -case class SchedulingConfig(staleThreshold: FiniteDuration, checkInterval: FiniteDuration, dropInterval: FiniteDuration) +case class SchedulingConfig(staleThreshold: FiniteDuration, + checkInterval: FiniteDuration, + dropInterval: FiniteDuration, + allowOverProvisionBeforeThrottle: Boolean, + namespaceOverProvisionBeforeThrottleRatio: Double) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala index d6ae8f63e17..19c458fc9ed 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala @@ -70,7 +70,17 @@ class SchedulingDecisionMaker( case _ => Future.successful(DecisionResults(Pausing, 0)) } } else { - val capacity = limit - existingContainerCountInNs - inProgressContainerCountInNs + val capacity = if (schedulingConfig.allowOverProvisionBeforeThrottle && totalContainers == 0) { + // if space available within the over provision ratio amount above namespace limit, create one container for new + // action so namespace traffic can attempt to re-balance without blocking entire action + if ((ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs) > 0) { + 1 + } else { + 0 + } + } else { + limit - existingContainerCountInNs - inProgressContainerCountInNs + } if (capacity <= 0) { stateName match { @@ -79,12 +89,15 @@ class SchedulingDecisionMaker( * * However, if the container exists(totalContainers != 0), the activation is not treated as a failure and the activation is delivered to the container. */ - case Running => + case Running + if !schedulingConfig.allowOverProvisionBeforeThrottle || (schedulingConfig.allowOverProvisionBeforeThrottle && ceiling( + limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs <= 0) => logging.info( this, s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, namespaceContainers: ${existingContainerCountInNs}, namespaceInProgressContainer: ${inProgressContainerCountInNs}) [$invocationNamespace:$action]") Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0)) - + case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 => + Future.successful(DecisionResults(DisableNamespaceThrottling, 0)) // do nothing case _ => // no need to print any messages if the state is already NamespaceThrottled diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala index 090ce3d6e3a..d793c58486f 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala @@ -86,7 +86,7 @@ class MemoryQueueTestsFixture val testNamespace = "test-namespace" val testAction = "test-action" - val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds) + val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, false, 1.5) val fqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1))) val revision = DocRevision("1-testRev") diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala index ce1c8668233..b70b715542a 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala @@ -47,7 +47,7 @@ class SchedulingDecisionMakerTests val testAction = "test-action" val action = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1))) - val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds) + val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, false, 1.5) it should "decide pausing when the limit is less than equal to 0" in { val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) @@ -149,7 +149,7 @@ class SchedulingDecisionMakerTests } } - it should "enable namespace throttling with dropping msg when there is not enough capacity and no container" in { + it should "enable namespace throttling with dropping msg when there is not enough capacity, no container, and namespace over-provision disabled" in { val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) val testProbe = TestProbe() @@ -173,7 +173,7 @@ class SchedulingDecisionMakerTests testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = true), 0)) } - it should "enable namespace throttling without dropping msg when there is not enough capacity but are some containers" in { + it should "enable namespace throttling without dropping msg when there is not enough capacity but are some containers and namespace over-provision disabled" in { val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) val testProbe = TestProbe() @@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = false), 0)) } - it should "add an initial container if there is no any" in { + it should "add one container when there is no container, and namespace over-provision has capacity" in { + val schedulingConfigNamespaceOverProvisioning = + SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.5) + val decisionMaker = + system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning)) + val testProbe = TestProbe() + + val msg = QueueSnapshot( + initialized = false, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 0, + existingContainerCount = 0, // there is no container for this action + inProgressContainerCount = 0, + staleActivationNum = 0, + existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace + inProgressContainerCountInNamespace = 1, + averageDuration = None, + limit = 2, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + // this queue cannot create an initial container so enable throttling and drop messages. + testProbe.expectMsg(DecisionResults(AddInitialContainer, 1)) + } + + it should "enable namespace throttling with dropping msg when there is no container, and namespace over-provision has no capacity" in { + val schedulingConfigNamespaceOverProvisioning = + SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.0) + val decisionMaker = + system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning)) + val testProbe = TestProbe() + + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 0, + existingContainerCount = 0, // there is no container for this action + inProgressContainerCount = 0, + staleActivationNum = 0, + existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace + inProgressContainerCountInNamespace = 1, + averageDuration = None, + limit = 2, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + // this queue cannot create an initial container so enable throttling and drop messages. + testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = true), 0)) + } + + it should "disable namespace throttling when namespace over-provision has capacity again" in { + val schedulingConfigNamespaceOverProvisioning = + SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.1) + val decisionMaker = + system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning)) + val testProbe = TestProbe() + + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 0, + existingContainerCount = 1, // there is one container for this action + inProgressContainerCount = 0, + staleActivationNum = 0, + existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace + inProgressContainerCountInNamespace = 1, + averageDuration = None, + limit = 2, + stateName = NamespaceThrottled, + recipient = testProbe.ref) + + decisionMaker ! msg + + // this queue cannot create an initial container so enable throttling and drop messages. + testProbe.expectMsg(DecisionResults(DisableNamespaceThrottling, 0)) + } + + it should "enable namespace throttling without dropping msg when there is a container, and namespace over-provision has no additional capacity" in { + val schedulingConfigNamespaceOverProvisioning = + SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.0) + val decisionMaker = + system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning)) + val testProbe = TestProbe() + + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 0, + existingContainerCount = 1, + inProgressContainerCount = 0, + staleActivationNum = 0, + existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace + inProgressContainerCountInNamespace = 1, + averageDuration = None, + limit = 2, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + // this queue cannot create an additional container so enable throttling and drop messages. + testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = false), 0)) + } + + it should "not enable namespace throttling when there is not enough capacity but are some containers and namespace over-provision is enabled with capacity" in { + val schedulingConfigNamespaceOverProvisioning = + SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.5) + val decisionMaker = + system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning)) + val testProbe = TestProbe() + + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 0, + existingContainerCount = 1, // there are some containers for this action + inProgressContainerCount = 1, + staleActivationNum = 0, + existingContainerCountInNamespace = 2, // but there are already 2 containers in this namespace + inProgressContainerCountInNamespace = 2, // this value includes the count of this action as well. + averageDuration = None, + limit = 4, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + // this queue cannot create more containers + testProbe.expectNoMessage() + } + + it should "add an initial container if there is not any" in { val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) val testProbe = TestProbe() @@ -219,6 +354,7 @@ class SchedulingDecisionMakerTests testProbe.expectMsg(DecisionResults(AddInitialContainer, 1)) } + it should "disable the namespace throttling with adding an initial container when there is no container" in { val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) val testProbe = TestProbe()