diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala index 653d3b1a00b..6f31eee80cc 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala @@ -43,13 +43,14 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None, maxActionLogs: Option[LogLimit] = None, minActionTimeout: Option[TimeLimit] = None, maxActionTimeout: Option[TimeLimit] = None, - minActionConcurrency: Option[ConcurrencyLimit] = None, - maxActionConcurrency: Option[ConcurrencyLimit] = None, + minActionConcurrency: Option[IntraConcurrencyLimit] = None, + maxActionConcurrency: Option[IntraConcurrencyLimit] = None, maxParameterSize: Option[ByteSize] = None, maxPayloadSize: Option[ByteSize] = None, truncationSize: Option[ByteSize] = None, warmedContainerKeepingCount: Option[Int] = None, - warmedContainerKeepingTimeout: Option[String] = None) { + warmedContainerKeepingTimeout: Option[String] = None, + maxActionInstances: Option[Int] = None) { def allowedMaxParameterSize: ByteSize = { val namespaceLimit = maxParameterSize getOrElse (Parameters.MAX_SIZE_DEFAULT) @@ -73,16 +74,16 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None, } def allowedMaxActionConcurrency: Int = { - val namespaceLimit = maxActionConcurrency.map(_.maxConcurrent) getOrElse (ConcurrencyLimit.MAX_CONCURRENT_DEFAULT) - if (namespaceLimit > ConcurrencyLimit.MAX_CONCURRENT) { - ConcurrencyLimit.MAX_CONCURRENT + val namespaceLimit = maxActionConcurrency.map(_.maxConcurrent) getOrElse (IntraConcurrencyLimit.MAX_CONCURRENT_DEFAULT) + if (namespaceLimit > IntraConcurrencyLimit.MAX_CONCURRENT) { + IntraConcurrencyLimit.MAX_CONCURRENT } else namespaceLimit } def allowedMinActionConcurrency: Int = { - val namespaceLimit = minActionConcurrency.map(_.maxConcurrent) getOrElse (ConcurrencyLimit.MIN_CONCURRENT_DEFAULT) - if (namespaceLimit < ConcurrencyLimit.MIN_CONCURRENT) { - ConcurrencyLimit.MIN_CONCURRENT + val namespaceLimit = minActionConcurrency.map(_.maxConcurrent) getOrElse (IntraConcurrencyLimit.MIN_CONCURRENT_DEFAULT) + if (namespaceLimit < IntraConcurrencyLimit.MIN_CONCURRENT) { + IntraConcurrencyLimit.MIN_CONCURRENT } else namespaceLimit } @@ -127,13 +128,12 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None, TimeLimit.MIN_DURATION } else namespaceLimit } - } object UserLimits extends DefaultJsonProtocol { val standardUserLimits = UserLimits() private implicit val byteSizeSerdes = size.serdes - implicit val serdes = jsonFormat18(UserLimits.apply) + implicit val serdes = jsonFormat19(UserLimits.apply) } protected[core] case class Namespace(name: EntityName, uuid: UUID) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceConcurrencyLimit.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceConcurrencyLimit.scala new file mode 100644 index 00000000000..c6edc88c1f3 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceConcurrencyLimit.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.entity + +import org.apache.openwhisk.http.Messages + +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import spray.json._ + +/** + * InstanceConcurrencyLimit encapsulates max allowed container concurrency for an action within a given namespace. + * A user is given a max concurrency for their entire namespace, but this doesn't allow for any fairness across their actions + * during load spikes. This action limit allows a user to specify max container concurrency for a specific action within the + * constraints of their namespace limit. By default, this limit does not exist and therefore the namespace concurrency limit is used. + * The allowed range is thus [1, namespaceConcurrencyLimit]. If this config is not used by any actions, then the default behavior + * of openwhisk continues in which any action can use the entire concurrency limit of the namespace. The limit less than namespace + * limit check occurs at the api level. + * + * NOTE: This limit is only leveraged on openwhisk v2 with the scheduler service. If this limit is set on a deployment of openwhisk + * not using the scheduler service, the limit will do nothing. + * + * + * @param maxConcurrentInstances the max number of concurrent activations in a single container + */ +protected[entity] class InstanceConcurrencyLimit private(val maxConcurrentInstances: Int) extends AnyVal + +protected[core] object InstanceConcurrencyLimit extends ArgNormalizer[InstanceConcurrencyLimit] { + + /** These values are set once at the beginning. Dynamic configuration updates are not supported at the moment. */ + protected[core] val MIN_INSTANCES_LIMIT: Int = 0 + + /** + * Creates ContainerConcurrencyLimit for limit, iff limit is within permissible range. + * + * @param maxConcurrenctInstances the limit, must be within permissible range + * @return ConcurrencyLimit with limit set + * @throws IllegalArgumentException if limit does not conform to requirements + */ + @throws[IllegalArgumentException] + protected[core] def apply(maxConcurrenctInstances: Int): InstanceConcurrencyLimit = { + require( + maxConcurrenctInstances >= MIN_INSTANCES_LIMIT, + Messages.belowMinAllowedActionInstanceConcurrency(MIN_INSTANCES_LIMIT)) + new InstanceConcurrencyLimit(maxConcurrenctInstances) + } + + override protected[core] implicit val serdes = new RootJsonFormat[InstanceConcurrencyLimit] { + def write(m: InstanceConcurrencyLimit) = JsNumber(m.maxConcurrentInstances) + + def read(value: JsValue) = { + Try { + val JsNumber(c) = value + require(c.isWhole, "instance concurrency limit must be whole number") + + InstanceConcurrencyLimit(c.toInt) + } match { + case Success(limit) => limit + case Failure(e: IllegalArgumentException) => deserializationError(e.getMessage, e) + case Failure(e: Throwable) => deserializationError("instance concurrency limit malformed", e) + } + } + } +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ConcurrencyLimit.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/IntraConcurrencyLimit.scala similarity index 77% rename from common/scala/src/main/scala/org/apache/openwhisk/core/entity/ConcurrencyLimit.scala rename to common/scala/src/main/scala/org/apache/openwhisk/core/entity/IntraConcurrencyLimit.scala index a6ebddfe394..ccf55347946 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ConcurrencyLimit.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/IntraConcurrencyLimit.scala @@ -28,11 +28,11 @@ import scala.util.Success import scala.util.Try import spray.json._ -case class NamespaceConcurrencyLimitConfig(min: Int, max: Int) -case class ConcurrencyLimitConfig(min: Int, max: Int, std: Int) +case class NamespaceIntraConcurrencyLimitConfig(min: Int, max: Int) +case class IntraConcurrencyLimitConfig(min: Int, max: Int, std: Int) /** - * ConcurrencyLimit encapsulates allowed concurrency in a single container for an action. The limit must be within a + * IntraConcurrencyLimit encapsulates allowed concurrency in a single container for an action. The limit must be within a * permissible range (by default [1, 1]). This default range was chosen intentionally to reflect that concurrency * is disabled by default. * @@ -42,7 +42,7 @@ case class ConcurrencyLimitConfig(min: Int, max: Int, std: Int) * * @param maxConcurrent the max number of concurrent activations in a single container */ -protected[entity] class ConcurrencyLimit private (val maxConcurrent: Int) extends AnyVal { +protected[entity] class IntraConcurrencyLimit private(val maxConcurrent: Int) extends AnyVal { /** It checks the namespace memory limit setting value */ @throws[ActionConcurrencyLimitException] @@ -60,17 +60,17 @@ protected[entity] class ConcurrencyLimit private (val maxConcurrent: Int) extend } } -protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit] { +protected[core] object IntraConcurrencyLimit extends ArgNormalizer[IntraConcurrencyLimit] { //since tests require override to the default config, load the "test" config, with fallbacks to default val config = ConfigFactory.load().getConfig("test") private val concurrencyConfig = - loadConfigWithFallbackOrThrow[ConcurrencyLimitConfig](config, ConfigKeys.concurrencyLimit) + loadConfigWithFallbackOrThrow[IntraConcurrencyLimitConfig](config, ConfigKeys.concurrencyLimit) private val namespaceConcurrencyDefaultConfig = try { - loadConfigWithFallbackOrThrow[NamespaceConcurrencyLimitConfig](config, ConfigKeys.namespaceConcurrencyLimit) + loadConfigWithFallbackOrThrow[NamespaceIntraConcurrencyLimitConfig](config, ConfigKeys.namespaceConcurrencyLimit) } catch { case _: Throwable => // Supports backwards compatibility for openwhisk that do not use the namespace default limit - NamespaceConcurrencyLimitConfig(concurrencyConfig.min, concurrencyConfig.max) + NamespaceIntraConcurrencyLimitConfig(concurrencyConfig.min, concurrencyConfig.max) } /** @@ -91,10 +91,10 @@ protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit] require(MIN_CONCURRENT <= MIN_CONCURRENT_DEFAULT, "The system min limit must be less than the namespace min limit.") /** A singleton ConcurrencyLimit with default value */ - protected[core] val standardConcurrencyLimit = ConcurrencyLimit(STD_CONCURRENT) + protected[core] val standardConcurrencyLimit = IntraConcurrencyLimit(STD_CONCURRENT) /** Gets ConcurrencyLimit with default value */ - protected[core] def apply(): ConcurrencyLimit = standardConcurrencyLimit + protected[core] def apply(): IntraConcurrencyLimit = standardConcurrencyLimit /** * Creates ConcurrencyLimit for limit, iff limit is within permissible range. @@ -104,19 +104,19 @@ protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit] * @throws IllegalArgumentException if limit does not conform to requirements */ @throws[IllegalArgumentException] - protected[core] def apply(concurrency: Int): ConcurrencyLimit = { - new ConcurrencyLimit(concurrency) + protected[core] def apply(concurrency: Int): IntraConcurrencyLimit = { + new IntraConcurrencyLimit(concurrency) } - override protected[core] implicit val serdes = new RootJsonFormat[ConcurrencyLimit] { - def write(m: ConcurrencyLimit) = JsNumber(m.maxConcurrent) + override protected[core] implicit val serdes = new RootJsonFormat[IntraConcurrencyLimit] { + def write(m: IntraConcurrencyLimit) = JsNumber(m.maxConcurrent) def read(value: JsValue) = { Try { val JsNumber(c) = value - require(c.isWhole, "concurrency limit must be whole number") + require(c.isWhole, "intra concurrency limit must be whole number") - ConcurrencyLimit(c.toInt) + IntraConcurrencyLimit(c.toInt) } match { case Success(limit) => limit case Failure(e: IllegalArgumentException) => deserializationError(e.getMessage, e) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Limits.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Limits.scala index 8d3b932c620..cbe78547adf 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Limits.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Limits.scala @@ -46,11 +46,13 @@ protected[entity] abstract class Limits { * @param memory the memory limit in megabytes, assured to be non-null because it is a value * @param logs the limit for logs written by the container and stored in the activation record, assured to be non-null because it is a value * @param concurrency the limit on concurrently processed activations per container, assured to be non-null because it is a value + * @param instances the limit in which an action can scale up to within the confines of the namespace's concurrency limit */ protected[core] case class ActionLimits(timeout: TimeLimit = TimeLimit(), memory: MemoryLimit = MemoryLimit(), logs: LogLimit = LogLimit(), - concurrency: ConcurrencyLimit = ConcurrencyLimit()) + concurrency: IntraConcurrencyLimit = IntraConcurrencyLimit(), + instances: Option[InstanceConcurrencyLimit] = None) extends Limits { override protected[entity] def toJson = ActionLimits.serdes.write(this) @@ -73,19 +75,19 @@ protected[core] case class TriggerLimits protected[core] () extends Limits { protected[core] object ActionLimits extends ArgNormalizer[ActionLimits] with DefaultJsonProtocol { override protected[core] implicit val serdes = new RootJsonFormat[ActionLimits] { - val helper = jsonFormat4(ActionLimits.apply) + val helper = jsonFormat5(ActionLimits.apply) def read(value: JsValue) = { val obj = Try { value.asJsObject.convertTo[Map[String, JsValue]] } getOrElse deserializationError("no valid json object passed") - val time = TimeLimit.serdes.read(obj.get("timeout") getOrElse deserializationError("'timeout' is missing")) - val memory = MemoryLimit.serdes.read(obj.get("memory") getOrElse deserializationError("'memory' is missing")) - val logs = obj.get("logs") map { LogLimit.serdes.read(_) } getOrElse LogLimit() - val concurrency = obj.get("concurrency") map { ConcurrencyLimit.serdes.read(_) } getOrElse ConcurrencyLimit() - - ActionLimits(time, memory, logs, concurrency) + val time = TimeLimit.serdes.read(obj.getOrElse("timeout", deserializationError("'timeout' is missing"))) + val memory = MemoryLimit.serdes.read(obj.getOrElse("memory", deserializationError("'memory' is missing"))) + val logs = obj.get("logs") map { LogLimit.serdes.read } getOrElse LogLimit() + val concurrency = obj.get("concurrency") map { IntraConcurrencyLimit.serdes.read } getOrElse IntraConcurrencyLimit() + val instances = obj.get("instances") map { InstanceConcurrencyLimit.serdes.read } + ActionLimits(time, memory, logs, concurrency, instances) } def write(a: ActionLimits) = helper.write(a) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala index e06057367c0..d9ebfd16e19 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala @@ -40,7 +40,8 @@ import org.apache.openwhisk.core.entity.types.EntityStore case class ActionLimitsOption(timeout: Option[TimeLimit], memory: Option[MemoryLimit], logs: Option[LogLimit], - concurrency: Option[ConcurrencyLimit]) + concurrency: Option[IntraConcurrencyLimit], + instances: Option[InstanceConcurrencyLimit] = None) /** * WhiskActionPut is a restricted WhiskAction view that eschews properties @@ -647,7 +648,7 @@ object WhiskActionMetaData } object ActionLimitsOption extends DefaultJsonProtocol { - implicit val serdes = jsonFormat4(ActionLimitsOption.apply) + implicit val serdes = jsonFormat5(ActionLimitsOption.apply) } object WhiskActionPut extends DefaultJsonProtocol { diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala index 5e0a838a537..41e2f866175 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala @@ -72,6 +72,12 @@ object Messages { def tooManyConcurrentRequests(count: Int, allowed: Int) = s"Too many concurrent requests in flight (count: $count, allowed: $allowed)." + def maxActionInstanceConcurrencyExceedsNamespace(namespaceConcurrencyLimit: Int) = + s"Max action instance concurrency must not exceed your namespace concurrency of $namespaceConcurrencyLimit." + + def belowMinAllowedActionInstanceConcurrency(minThreshold: Int) = + s"Action container concurrency must be greater than or equal to $minThreshold." + /** System overload message. */ val systemOverloaded = "System is overloaded, try again later." diff --git a/core/controller/src/main/resources/apiv1swagger.json b/core/controller/src/main/resources/apiv1swagger.json index bf2cf35e0c8..3e25b267f20 100644 --- a/core/controller/src/main/resources/apiv1swagger.json +++ b/core/controller/src/main/resources/apiv1swagger.json @@ -1800,8 +1800,13 @@ "concurrency": { "type": "integer", "format": "int32", - "description": "number of concurrent activations allowed", + "description": "number of concurrent activations allowed within an instance", "default": 1 + }, + "instances": { + "type": "integer", + "format": "int32", + "description": "Max number of instances allowed for an action. Must be less than or equal to namespace concurrency limit. Default is the namespace concurrency limit." } } }, @@ -2868,11 +2873,15 @@ }, "minActionConcurrency": { "type": "integer", - "description": "Min number of concurrent activations allowed" + "description": "Min number of concurrent activations within an instance allowed" }, "maxActionConcurrency": { "type": "integer", - "description": "Max number of concurrent activations allowed" + "description": "Max number of concurrent activations within an instance allowed" + }, + "maxActionInstances": { + "type": "integer", + "description": "Max number of concurrent instances allowed for an action" } } } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala index cc12388026b..329efbcd8c8 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala @@ -220,7 +220,7 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with onComplete(check) { case Success(_) => - putEntity(WhiskAction, entityStore, entityName.toDocId, overwrite, update(user, request) _, () => { + putEntity(WhiskAction, entityStore, entityName.toDocId, overwrite, update(user, request), () => { make(user, entityName, request) }) case Failure(f) => @@ -455,7 +455,8 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with l.timeout getOrElse TimeLimit(), l.memory getOrElse MemoryLimit(), l.logs getOrElse LogLimit(), - l.concurrency getOrElse ConcurrencyLimit()) + l.concurrency getOrElse IntraConcurrencyLimit(), + l.instances) } getOrElse ActionLimits() // This is temporary while we are making sequencing directly supported in the controller. // The parameter override allows this to work with Pipecode.code. Any parameters other @@ -503,37 +504,41 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with /** Creates a WhiskAction from PUT content, generating default values where necessary. */ private def make(user: Identity, entityName: FullyQualifiedEntityName, content: WhiskActionPut)( implicit transid: TransactionId) = { - content.exec map { - case seq: SequenceExec => - // check that the sequence conforms to max length and no recursion rules - checkSequenceActionLimits(entityName, seq.components) map { _ => - makeWhiskAction(content.replace(seq), entityName) - } - case supportedExec if !supportedExec.deprecated => - Future successful makeWhiskAction(content, entityName) - case deprecatedExec => - Future failed RejectRequest(BadRequest, runtimeDeprecated(deprecatedExec)) + checkInstanceConcurrencyLessThanNamespaceConcurrency(user, content) flatMap { _ => + content.exec map { + case seq: SequenceExec => + // check that the sequence conforms to max length and no recursion rules + checkSequenceActionLimits(entityName, seq.components) map { _ => + makeWhiskAction(content.replace(seq), entityName) + } + case supportedExec if !supportedExec.deprecated => + Future successful makeWhiskAction(content, entityName) + case deprecatedExec => + Future failed RejectRequest(BadRequest, runtimeDeprecated(deprecatedExec)) - } getOrElse Future.failed(RejectRequest(BadRequest, "exec undefined")) + } getOrElse Future.failed(RejectRequest(BadRequest, "exec undefined")) + } } /** Updates a WhiskAction from PUT content, merging old action where necessary. */ private def update(user: Identity, content: WhiskActionPut)(action: WhiskAction)(implicit transid: TransactionId) = { - content.exec map { - case seq: SequenceExec => - // check that the sequence conforms to max length and no recursion rules - checkSequenceActionLimits(FullyQualifiedEntityName(action.namespace, action.name), seq.components) map { _ => - updateWhiskAction(content.replace(seq), action) + checkInstanceConcurrencyLessThanNamespaceConcurrency(user, content) flatMap { _ => + content.exec map { + case seq: SequenceExec => + // check that the sequence conforms to max length and no recursion rules + checkSequenceActionLimits(FullyQualifiedEntityName(action.namespace, action.name), seq.components) map { _ => + updateWhiskAction(content.replace(seq), action) + } + case supportedExec if !supportedExec.deprecated => + Future successful updateWhiskAction(content, action) + case deprecatedExec => + Future failed RejectRequest(BadRequest, runtimeDeprecated(deprecatedExec)) + } getOrElse { + if (!action.exec.deprecated) { + Future successful updateWhiskAction(content, action) + } else { + Future failed RejectRequest(BadRequest, runtimeDeprecated(action.exec)) } - case supportedExec if !supportedExec.deprecated => - Future successful updateWhiskAction(content, action) - case deprecatedExec => - Future failed RejectRequest(BadRequest, runtimeDeprecated(deprecatedExec)) - } getOrElse { - if (!action.exec.deprecated) { - Future successful updateWhiskAction(content, action) - } else { - Future failed RejectRequest(BadRequest, runtimeDeprecated(action.exec)) } } } @@ -547,7 +552,8 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with l.timeout getOrElse action.limits.timeout, l.memory getOrElse action.limits.memory, l.logs getOrElse action.limits.logs, - l.concurrency getOrElse action.limits.concurrency) + l.concurrency getOrElse action.limits.concurrency, + if (l.instances.isDefined) l.instances else action.limits.instances) } getOrElse action.limits // This is temporary while we are making sequencing directly supported in the controller. @@ -684,6 +690,25 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with } } + private def checkInstanceConcurrencyLessThanNamespaceConcurrency(user: Identity, content: WhiskActionPut)( + implicit transid: TransactionId): Future[Unit] = { + val namespaceConcurrencyLimit = + user.limits.concurrentInvocations.getOrElse(whiskConfig.actionInvokeConcurrentLimit.toInt) + content.limits + .map( + l => + l.instances + .map( + m => + if (m.maxConcurrentInstances > namespaceConcurrencyLimit) + Future failed RejectRequest( + BadRequest, + maxActionInstanceConcurrencyExceedsNamespace(namespaceConcurrencyLimit)) + else Future.successful({})) + .getOrElse(Future.successful({}))) + .getOrElse(Future.successful({})) + } + /** * Counts the number of atomic actions in a sequence and checks for potential cycles. The latter is done * by inlining any sequence components that are themselves sequences and checking if there if a reference to diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Limits.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Limits.scala index e5fa975538a..d621bc657ad 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Limits.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Limits.scala @@ -24,7 +24,7 @@ import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.core.WhiskConfig import org.apache.openwhisk.core.entitlement.{Collection, Privilege, Resource} import org.apache.openwhisk.core.entitlement.Privilege.READ -import org.apache.openwhisk.core.entity.{ConcurrencyLimit, Identity, LogLimit, MemoryLimit, TimeLimit} +import org.apache.openwhisk.core.entity.{IntraConcurrencyLimit, Identity, LogLimit, MemoryLimit, TimeLimit} trait WhiskLimitsApi extends Directives with AuthenticatedRouteProvider with AuthorizedRouteProvider { @@ -62,9 +62,11 @@ trait WhiskLimitsApi extends Directives with AuthenticatedRouteProvider with Aut minActionLogs = Some(LogLimit(user.limits.allowedMinActionLogs)), maxActionTimeout = Some(TimeLimit(user.limits.allowedMaxActionTimeout)), minActionTimeout = Some(TimeLimit(user.limits.allowedMinActionTimeout)), - maxActionConcurrency = Some(ConcurrencyLimit(user.limits.allowedMaxActionConcurrency)), - minActionConcurrency = Some(ConcurrencyLimit(user.limits.allowedMinActionConcurrency)), - maxParameterSize = Some(user.limits.allowedMaxParameterSize)) + maxActionConcurrency = Some(IntraConcurrencyLimit(user.limits.allowedMaxActionConcurrency)), + minActionConcurrency = Some(IntraConcurrencyLimit(user.limits.allowedMinActionConcurrency)), + maxParameterSize = Some(user.limits.allowedMaxParameterSize), + maxActionInstances = + Some(user.limits.concurrentInvocations.getOrElse(concurrentInvocationsSystemDefault))) pathEndOrSingleSlash { complete(OK, limits) } case _ => reject //should never get here } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala index 4016d0c5596..c1688ac8315 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala @@ -67,7 +67,7 @@ class LeanBalancer(config: WhiskConfig, /** Creates an invoker for executing user actions. There is only one invoker in the lean model. */ private def makeALocalThreadedInvoker(): Unit = { implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext() - val limitConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit) + val limitConfig: IntraConcurrencyLimitConfig = loadConfigOrThrow[IntraConcurrencyLimitConfig](ConfigKeys.concurrencyLimit) SpiLoader.get[InvokerProvider].instance(config, invokerName, messageProducer, poolConfig, limitConfig) } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala index dd6198a34a6..d8add53613b 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala @@ -62,7 +62,7 @@ object FPCInvokerReactive extends InvokerProvider { instance: InvokerInstanceId, producer: MessageProducer, poolConfig: ContainerPoolConfig, - limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore = + limitsConfig: IntraConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore = new FPCInvokerReactive(config, instance, producer, poolConfig, limitsConfig) } @@ -71,7 +71,7 @@ class FPCInvokerReactive(config: WhiskConfig, producer: MessageProducer, poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool), - limitsConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig]( + limitsConfig: IntraConcurrencyLimitConfig = loadConfigOrThrow[IntraConcurrencyLimitConfig]( ConfigKeys.concurrencyLimit))(implicit actorSystem: ActorSystem, logging: Logging) extends InvokerCore { diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 22171fddc18..592809728f7 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -107,7 +107,7 @@ object Invoker { ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec)) implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this)) val poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool) - val limitConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit) + val limitConfig: IntraConcurrencyLimitConfig = loadConfigOrThrow[IntraConcurrencyLimitConfig](ConfigKeys.concurrencyLimit) val tags: Seq[String] = Some(loadConfigOrThrow[String](ConfigKeys.invokerResourceTags)) .map(_.trim()) .filter(_ != "") @@ -240,7 +240,7 @@ trait InvokerProvider extends Spi { instance: InvokerInstanceId, producer: MessageProducer, poolConfig: ContainerPoolConfig, - limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore + limitsConfig: IntraConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore } // this trait can be used to add common implementation diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 8d821e42202..17d9c9bb81b 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -50,7 +50,7 @@ object InvokerReactive extends InvokerProvider { instance: InvokerInstanceId, producer: MessageProducer, poolConfig: ContainerPoolConfig, - limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore = + limitsConfig: IntraConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore = new InvokerReactive(config, instance, producer, poolConfig, limitsConfig) } @@ -59,7 +59,7 @@ class InvokerReactive( instance: InvokerInstanceId, producer: MessageProducer, poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool), - limitsConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit))( + limitsConfig: IntraConcurrencyLimitConfig = loadConfigOrThrow[IntraConcurrencyLimitConfig](ConfigKeys.concurrencyLimit))( implicit actorSystem: ActorSystem, logging: Logging) extends InvokerCore { diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala index 051e289767e..d316be23c3a 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -930,8 +930,16 @@ class MemoryQueue(private val etcdClient: EtcdClient, if (averageDurationBuffer.nonEmpty) { averageDuration = Some(averageDurationBuffer.average) } + getUserLimit(invocationNamespace).andThen { - case Success(limit) => + case Success(namespaceLimit) => + // extra safeguard to use namespace limit if action limit exceeds due to namespace limit being lowered + // by operator after action is deployed + val actionLimit = actionMetaData.limits.instances + .map(limit => + if (limit.maxConcurrentInstances > namespaceLimit) InstanceConcurrencyLimit(namespaceLimit) else limit) + .getOrElse(InstanceConcurrencyLimit(namespaceLimit)) + .maxConcurrentInstances decisionMaker ! QueueSnapshot( initialized, in, @@ -942,7 +950,8 @@ class MemoryQueue(private val etcdClient: EtcdClient, namespaceContainerCount.existingContainerNumByNamespace, namespaceContainerCount.inProgressContainerNumByNamespace, averageDuration, - limit, + namespaceLimit, + actionLimit, actionMetaData.limits.concurrency.maxConcurrent, stateName, self) @@ -1222,7 +1231,8 @@ case class QueueSnapshot(initialized: Boolean, existingContainerCountInNamespace: Int, inProgressContainerCountInNamespace: Int, averageDuration: Option[Double], - limit: Int, + namespaceLimit: Int, + actionLimit: Int, maxActionConcurrency: Int, stateName: MemoryQueueState, recipient: ActorRef) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala index ab0de2f6af4..61ebfbc5015 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala @@ -57,14 +57,18 @@ class SchedulingDecisionMaker( existingContainerCountInNs, inProgressContainerCountInNs, averageDuration, - limit, + namespaceLimit, + actionLimit, maxActionConcurrency, stateName, _) = snapshot val totalContainers = existing + inProgress val availableMsg = currentMsg + incoming.get() + val actionCapacity = actionLimit - totalContainers + val namespaceCapacity = namespaceLimit - existingContainerCountInNs - inProgressContainerCountInNs + val overProvisionCapacity = ceiling(namespaceLimit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs - if (limit <= 0) { + if (Math.min(namespaceLimit, actionLimit) <= 0) { // this is an error case, the limit should be bigger than 0 stateName match { case Flushing => Future.successful(DecisionResults(Skip, 0)) @@ -74,14 +78,15 @@ class SchedulingDecisionMaker( val capacity = if (schedulingConfig.allowOverProvisionBeforeThrottle && totalContainers == 0) { // if space available within the over provision ratio amount above namespace limit, create one container for new // action so namespace traffic can attempt to re-balance without blocking entire action - if ((ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs) > 0) { + if (overProvisionCapacity > 0) { 1 } else { 0 } } else { - limit - existingContainerCountInNs - inProgressContainerCountInNs + Math.min(namespaceCapacity, actionCapacity) } + if (capacity <= 0) { stateName match { @@ -91,15 +96,12 @@ class SchedulingDecisionMaker( * However, if the container exists(totalContainers != 0), the activation is not treated as a failure and the activation is delivered to the container. */ case Running - if !schedulingConfig.allowOverProvisionBeforeThrottle || (schedulingConfig.allowOverProvisionBeforeThrottle && ceiling( - limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs <= 0) => + if !schedulingConfig.allowOverProvisionBeforeThrottle || (schedulingConfig.allowOverProvisionBeforeThrottle && overProvisionCapacity <= 0) => logging.info( this, - s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, namespaceContainers: ${existingContainerCountInNs}, namespaceInProgressContainer: ${inProgressContainerCountInNs}) [$invocationNamespace:$action]") + s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, actionLimit: $actionLimit, namespaceLimit: $namespaceLimit, namespaceContainers: $existingContainerCountInNs, namespaceInProgressContainer: $inProgressContainerCountInNs) [$invocationNamespace:$action]") Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0)) - case NamespaceThrottled - if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling( - limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 => + case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && overProvisionCapacity > 0 => Future.successful(DecisionResults(DisableNamespaceThrottling, 0)) // do nothing case _ => @@ -147,6 +149,7 @@ class SchedulingDecisionMaker( 0, availableMsg, capacity, + namespaceCapacity, actualNum, staleActivationNum, 0.0, @@ -177,6 +180,7 @@ class SchedulingDecisionMaker( containerThroughput, availableMsg, capacity, + namespaceCapacity, actualNum + staleContainerProvision, staleActivationNum, duration, @@ -188,6 +192,7 @@ class SchedulingDecisionMaker( containerThroughput, availableMsg, capacity, + namespaceCapacity, staleContainerProvision, staleActivationNum, duration, @@ -212,6 +217,7 @@ class SchedulingDecisionMaker( containerThroughput, availableMsg, capacity, + namespaceCapacity, actualNum, staleActivationNum, duration, @@ -229,6 +235,7 @@ class SchedulingDecisionMaker( 0, availableMsg, capacity, + namespaceCapacity, actualNum, staleActivationNum, 0.0, @@ -247,16 +254,25 @@ class SchedulingDecisionMaker( containerThroughput: Double, availableMsg: Int, capacity: Int, + namespaceCapacity: Int, actualNum: Int, staleActivationNum: Int, duration: Double = 0.0, state: MemoryQueueState) = { if (actualNum > capacity) { - // containers can be partially created. throttling should be enabled - logging.info( - this, - s"[$state] enable namespace throttling and add $capacity container, staleActivationNum: $staleActivationNum, duration: ${duration}, containerThroughput: $containerThroughput, availableMsg: $availableMsg, existing: $existing, inProgress: $inProgress, capacity: $capacity [$invocationNamespace:$action]") - Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = false), capacity)) + if (capacity >= namespaceCapacity) { + // containers can be partially created. throttling should be enabled + logging.info( + this, + s"[$state] enable namespace throttling and add $capacity container, staleActivationNum: $staleActivationNum, duration: $duration, containerThroughput: $containerThroughput, availableMsg: $availableMsg, existing: $existing, inProgress: $inProgress, capacity: $capacity [$invocationNamespace:$action]") + Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = false), capacity)) + } else { + logging.info( + this, + s"[$state] reached max containers allowed for this action adding $capacity containers, but there is still capacity on the namespace so namespace throttling is not turned on." + + s" staleActivationNum: $staleActivationNum, duration: $duration, containerThroughput: $containerThroughput, availableMsg: $availableMsg, existing: $existing, inProgress: $inProgress, capacity: $capacity [$invocationNamespace:$action]") + Future.successful(DecisionResults(AddContainer, capacity)) + } } else if (actualNum <= 0) { // it means nothing Future.successful(DecisionResults(Skip, 0)) @@ -265,7 +281,7 @@ class SchedulingDecisionMaker( // we need to create one more container than expected because existing container would already took the message logging.info( this, - s"[$state]add $actualNum container, staleActivationNum: $staleActivationNum, duration: ${duration}, containerThroughput: $containerThroughput, availableMsg: $availableMsg, existing: $existing, inProgress: $inProgress, capacity: $capacity [$invocationNamespace:$action]") + s"[$state]add $actualNum container, staleActivationNum: $staleActivationNum, duration: $duration, containerThroughput: $containerThroughput, availableMsg: $availableMsg, existing: $existing, inProgress: $inProgress, capacity: $capacity [$invocationNamespace:$action]") Future.successful(DecisionResults(AddContainer, actualNum)) } } diff --git a/docs/actions.md b/docs/actions.md index b68bff0dc23..9f814799f98 100644 --- a/docs/actions.md +++ b/docs/actions.md @@ -44,7 +44,7 @@ advanced topics. * [Deleting actions](#deleting-actions) * [Accessing action metadata within the action body](#accessing-action-metadata-within-the-action-body) * [Securing your action](security.md) -* [Concurrency in actions](concurrency.md) +* [Concurrency in actions](intra-concurrency.md) ## Languages and Runtimes diff --git a/docs/concurrency.md b/docs/intra-concurrency.md similarity index 93% rename from docs/concurrency.md rename to docs/intra-concurrency.md index 2b66f1bc933..9ee445f68fa 100644 --- a/docs/concurrency.md +++ b/docs/intra-concurrency.md @@ -16,9 +16,9 @@ # limitations under the License. # --> -# Concurrency +# Intra Instance Concurrency -Concurrency in actions can improve container reuse, and may be beneficial in cases where: +Concurrency within a container instance of an actions can improve container reuse, and may be beneficial in cases where: * your action can tolerate multiple activations being processed at once * you can rely on external log collection (e.g. via docker log drivers or some decoupled collection process like fluentd) @@ -29,7 +29,7 @@ Concurrent activation processing within the same action container can be enabled * enable the akka http client at invoker config * e.g. CONFIG_whisk_containerPool_akkaClient=true -* use a kind that supports concurrency (currently only `nodejs:14`, and `nodejs:12`) +* use a kind that supports concurrency (**currently only the nodejs family / language**) * enable concurrency at runtime container env (nodejs container only allows concurrency when started with an env var __OW_ALLOW_CONCURRENT=true) * e.g. CONFIG_whisk_containerFactory_containerArgs_extraArgs_env_0="__OW_ALLOW_CONCURRENT=true" * disable log collection at invoker diff --git a/docs/reference.md b/docs/reference.md index 8a928fa9628..b76873e9650 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -72,18 +72,23 @@ OpenWhisk has a few system limits, including how much memory an action can use a **Note:** This default limits are for the open source distribution; production deployments like IBM Cloud Functions likely have higher limits. As an operator or developer you can change some of the limits using [Ansible inventory variables](../ansible/README.md#changing-limits). +**Note:** On Openwhisk 2.0 with the scheduler service, **concurrent** in the table below really means the max containers +that can be provisioned at once for a namespace. The api _may_ be able to accept more activations than this number at once +depending on a number of factors. + The following table lists the default limits for actions. -| limit | description | configurable | unit | default | -| ----- | ----------- | ------------ | -----| ------- | -| timeout | a container is not allowed to run longer than N milliseconds | per action | milliseconds | 60000 | -| memory | a container is not allowed to allocate more than N MB of memory | per action | MB | 256 | -| logs | a container is not allowed to write more than N MB to stdout | per action | MB | 10 | +| limit | description | configurable | unit | default | +| ----- |-------------------------------------------------------------------------------------------| ------------ | -----| ------- | +| timeout | a container is not allowed to run longer than N milliseconds | per action | milliseconds | 60000 | +| memory | a container is not allowed to allocate more than N MB of memory | per action | MB | 256 | +| logs | a container is not allowed to write more than N MB to stdout | per action | MB | 10 | + | instances | an action is not allowed to have more containers than this value (**new scheduler only**) | per action | number | namespace concurrency limit | | concurrent | no more than N activations may be submitted per namespace either executing or queued for execution | per namespace | number | 100 | -| minuteRate | no more than N activations may be submitted per namespace per minute | per namespace | number | 120 | -| codeSize | the maximum size of the action code | configurable, limit per action | MB | 48 | -| parameters | the maximum size of the parameters that can be attached | not configurable, limit per action/package/trigger | MB | 1 | -| result | the maximum size of the action result | not configurable, limit per action | MB | 1 | +| minuteRate | no more than N activations may be submitted per namespace per minute | per namespace | number | 120 | +| codeSize | the maximum size of the action code | configurable, limit per action | MB | 48 | +| parameters | the maximum size of the parameters that can be attached | not configurable, limit per action/package/trigger | MB | 1 | +| result | the maximum size of the action result | not configurable, limit per action | MB | 1 | ### Per action timeout (ms) (Default: 60s) * The timeout limit N is in the range [100ms..300000ms] and is set per action in milliseconds. @@ -95,6 +100,15 @@ The following table lists the default limits for actions. * A user can change the limit when creating the action. * A container cannot have more memory allocated than the limit. +### Per action max instance concurrency (Default: namespace limit for concurrent invocations) **Only applicable using new scheduler** +* The max containers that will be created for an action before throttling in the range from [1..concurrentInvocations limit for namespace] +* By default the max allowed containers / server instances for an action is equal to the namespace limit. +* A user can change the limit when creating the action. +* Defining a lower limit than the namespace limit means your max container concurrency will be the action defined limit. +* If using actionConcurrency > 1 such that your action can handle multiple requests per instance, your true concurrency limit is actionContainerConcurrency * actionConcurrency. +* The actions within a namespaces containerConcurrency total do not have to add up to the namespace limit though you can configure it that way to guarantee an action will get exactly the action container concurrency. +* For example with a namespace limit of 30 with 2 actions each with a container limit of 20; if the first action is using 20, there will still be space for 10 for the other. + ### Per action logs (MB) (Default: 10MB) * The log limit N is in the range [0MB..10MB] and is set per action. * A user can change the limit when creating or updating the action. diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala index eccf04cd784..6f041f8369c 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala @@ -100,7 +100,7 @@ class ContainerPoolTests EntityPath("actionSpace"), EntityName("actionName"), exec, - limits = ActionLimits(concurrency = ConcurrencyLimit(if (concurrencyEnabled) 3 else 1))) + limits = ActionLimits(concurrency = IntraConcurrencyLimit(if (concurrencyEnabled) 3 else 1))) val differentAction = action.copy(name = EntityName("actionName2")) val largeAction = action.copy( @@ -1103,13 +1103,13 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { val data = warmedData( active = maxConcurrent, - action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))) + action = createAction(limits = ActionLimits(concurrency = IntraConcurrencyLimit(maxConcurrent)))) val pool = Map('warm -> data) ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe None val data2 = warmedData( active = maxConcurrent - 1, - action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))) + action = createAction(limits = ActionLimits(concurrency = IntraConcurrencyLimit(maxConcurrent)))) val pool2 = Map('warm -> data2) ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2) shouldBe Some('warm, data2) @@ -1120,7 +1120,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean) val maxConcurrent = if (concurrencyEnabled) 25 else 1 - val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))) + val action = createAction(limits = ActionLimits(concurrency = IntraConcurrencyLimit(maxConcurrent))) val data = warmingData(active = maxConcurrent - 1, action = action) val pool = Map('warming -> data) ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warming, data) @@ -1135,7 +1135,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean) val maxConcurrent = if (concurrencyEnabled) 25 else 1 - val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))) + val action = createAction(limits = ActionLimits(concurrency = IntraConcurrencyLimit(maxConcurrent))) val data = warmingColdData(active = maxConcurrent - 1, action = action) val data2 = warmedData(active = maxConcurrent - 1, action = action) val pool = Map('warming -> data, 'warm -> data2) @@ -1146,7 +1146,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean) val maxConcurrent = if (concurrencyEnabled) 25 else 1 - val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))) + val action = createAction(limits = ActionLimits(concurrency = IntraConcurrencyLimit(maxConcurrent))) val data = warmingColdData(active = maxConcurrent - 1, action = action) val pool = Map('warmingCold -> data) ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warmingCold, data) @@ -1162,7 +1162,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean) val maxConcurrent = if (concurrencyEnabled) 25 else 1 - val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))) + val action = createAction(limits = ActionLimits(concurrency = IntraConcurrencyLimit(maxConcurrent))) val data = warmingColdData(active = maxConcurrent - 1, action = action) val data2 = warmedData(active = maxConcurrent - 1, action = action) val pool = Map('warmingCold -> data, 'warm -> data2) @@ -1173,7 +1173,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean) val maxConcurrent = if (concurrencyEnabled) 25 else 1 - val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))) + val action = createAction(limits = ActionLimits(concurrency = IntraConcurrencyLimit(maxConcurrent))) val data = warmingColdData(active = maxConcurrent - 1, action = action) val data2 = warmingData(active = maxConcurrent - 1, action = action) val pool = Map('warmingCold -> data, 'warming -> data2) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala index 66f7c77392b..03a0e1f1e15 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala @@ -87,7 +87,7 @@ class ContainerProxyTests val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec) val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency", "false")).exists(_.toBoolean) - val testConcurrencyLimit = if (concurrencyEnabled) ConcurrencyLimit(2) else ConcurrencyLimit(1) + val testConcurrencyLimit = if (concurrencyEnabled) IntraConcurrencyLimit(2) else IntraConcurrencyLimit(1) val concurrentAction = ExecutableWhiskAction( EntityPath("actionSpace"), EntityName("actionName"), diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala index d2bfde2fcb9..ef6d8d36a86 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala @@ -627,7 +627,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(TimeLimit.MAX_DURATION)), Some(MemoryLimit(is)), Some(LogLimit(LogLimit.MAX_LOGSIZE)), - Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT))))) + Some(IntraConcurrencyLimit(IntraConcurrencyLimit.MAX_CONCURRENT))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -655,7 +655,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(TimeLimit.MAX_DURATION)), Some(MemoryLimit(is)), Some(LogLimit(LogLimit.MAX_LOGSIZE)), - Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT))))) + Some(IntraConcurrencyLimit(IntraConcurrencyLimit.MAX_CONCURRENT))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -683,7 +683,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(TimeLimit.MAX_DURATION)), Some(MemoryLimit(is)), Some(LogLimit(LogLimit.MAX_LOGSIZE)), - Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT))))) + Some(IntraConcurrencyLimit(IntraConcurrencyLimit.MAX_CONCURRENT))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -711,7 +711,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(TimeLimit.MAX_DURATION)), Some(MemoryLimit(MemoryLimit.MAX_MEMORY)), Some(LogLimit(is)), - Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT))))) + Some(IntraConcurrencyLimit(IntraConcurrencyLimit.MAX_CONCURRENT))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -739,7 +739,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(TimeLimit.MAX_DURATION)), Some(MemoryLimit(MemoryLimit.MAX_MEMORY)), Some(LogLimit(is)), - Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT))))) + Some(IntraConcurrencyLimit(IntraConcurrencyLimit.MAX_CONCURRENT))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -767,7 +767,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(TimeLimit.MAX_DURATION)), Some(MemoryLimit(MemoryLimit.MAX_MEMORY)), Some(LogLimit(is)), - Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT))))) + Some(IntraConcurrencyLimit(IntraConcurrencyLimit.MAX_CONCURRENT))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -795,7 +795,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(is)), Some(MemoryLimit(MemoryLimit.MAX_MEMORY)), Some(LogLimit(LogLimit.MAX_LOGSIZE)), - Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT))))) + Some(IntraConcurrencyLimit(IntraConcurrencyLimit.MAX_CONCURRENT))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -823,7 +823,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(is)), Some(MemoryLimit(MemoryLimit.MAX_MEMORY)), Some(LogLimit(LogLimit.MAX_LOGSIZE)), - Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT))))) + Some(IntraConcurrencyLimit(IntraConcurrencyLimit.MAX_CONCURRENT))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -851,7 +851,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(is)), Some(MemoryLimit(MemoryLimit.MAX_MEMORY)), Some(LogLimit(LogLimit.MAX_LOGSIZE)), - Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT))))) + Some(IntraConcurrencyLimit(IntraConcurrencyLimit.MAX_CONCURRENT))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -864,12 +864,12 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { it should "reject create when max concurrency is greater than maximum allowed namespace limit" in { implicit val tid = transid() - val allowed = ConcurrencyLimit.MAX_CONCURRENT - 2 - val is = ConcurrencyLimit.MAX_CONCURRENT - 1 + val allowed = IntraConcurrencyLimit.MAX_CONCURRENT - 2 + val is = IntraConcurrencyLimit.MAX_CONCURRENT - 1 val credsWithNamespaceLimits = WhiskAuthHelpers .newIdentity() - .copy(limits = UserLimits(maxActionConcurrency = Some(ConcurrencyLimit(allowed)))) + .copy(limits = UserLimits(maxActionConcurrency = Some(IntraConcurrencyLimit(allowed)))) val content = WhiskActionPut( Some(jsDefault("_")), @@ -879,7 +879,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(TimeLimit.MAX_DURATION)), Some(MemoryLimit(MemoryLimit.MAX_MEMORY)), Some(LogLimit(LogLimit.MAX_LOGSIZE)), - Some(ConcurrencyLimit(is))))) + Some(IntraConcurrencyLimit(is))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -892,12 +892,12 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { it should "reject create if exceeds the system max concurrency limit and indicate namespace limit in message" in { implicit val tid = transid() - val allowed = ConcurrencyLimit.MAX_CONCURRENT_DEFAULT - 1 - val is = ConcurrencyLimit.MAX_CONCURRENT + 1 + val allowed = IntraConcurrencyLimit.MAX_CONCURRENT_DEFAULT - 1 + val is = IntraConcurrencyLimit.MAX_CONCURRENT + 1 val credsWithNamespaceLimits = WhiskAuthHelpers .newIdentity() - .copy(limits = UserLimits(maxActionConcurrency = Some(ConcurrencyLimit(allowed)))) + .copy(limits = UserLimits(maxActionConcurrency = Some(IntraConcurrencyLimit(allowed)))) val content = WhiskActionPut( Some(jsDefault("_")), @@ -907,7 +907,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(TimeLimit.MAX_DURATION)), Some(MemoryLimit(MemoryLimit.MAX_MEMORY)), Some(LogLimit(LogLimit.MAX_LOGSIZE)), - Some(ConcurrencyLimit(is))))) + Some(IntraConcurrencyLimit(is))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -920,12 +920,12 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { it should "reject create when max concurrency is less than minimum allowed namespace limit" in { implicit val tid = transid() - val allowed = ConcurrencyLimit.MIN_CONCURRENT + 2 - val is = ConcurrencyLimit.MIN_CONCURRENT + 1 + val allowed = IntraConcurrencyLimit.MIN_CONCURRENT + 2 + val is = IntraConcurrencyLimit.MIN_CONCURRENT + 1 val credsWithNamespaceLimits = WhiskAuthHelpers .newIdentity() - .copy(limits = UserLimits(minActionConcurrency = Some(ConcurrencyLimit(allowed)))) + .copy(limits = UserLimits(minActionConcurrency = Some(IntraConcurrencyLimit(allowed)))) val content = WhiskActionPut( Some(jsDefault("_")), @@ -935,7 +935,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(TimeLimit.MAX_DURATION)), Some(MemoryLimit(MemoryLimit.MAX_MEMORY)), Some(LogLimit(LogLimit.MAX_LOGSIZE)), - Some(ConcurrencyLimit(is))))) + Some(IntraConcurrencyLimit(is))))) Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { status should be(BadRequest) @@ -945,6 +945,32 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { } } + it should "reject create when max instance concurrency is greater than namespace's concurrency" in { + implicit val tid = transid() + + val credsWithNamespaceLimits = WhiskAuthHelpers + .newIdentity() + .copy(limits = UserLimits(concurrentInvocations = Some(30))) + + val content = WhiskActionPut( + Some(jsDefault("_")), + Some(Parameters("x", "X")), + Some( + ActionLimitsOption( + None, + None, + None, + None, + Some(InstanceConcurrencyLimit(40))))) + + Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check { + status should be(BadRequest) + responseAs[String] should include { + Messages.maxActionInstanceConcurrencyExceedsNamespace(30) + } + } + } + it should "reject activation with entity which is too big" in { implicit val tid = transid() val code = "a" * (systemPayloadLimit.toBytes.toInt + 1) @@ -1751,7 +1777,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { Some(TimeLimit(TimeLimit.MAX_DURATION)), Some(MemoryLimit(MemoryLimit.MAX_MEMORY)), Some(LogLimit(LogLimit.MAX_LOGSIZE)), - Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT))))) + Some(IntraConcurrencyLimit(IntraConcurrencyLimit.MAX_CONCURRENT))))) put(entityStore, action) Put(s"$collectionPath/${action.name}?overwrite=true", content) ~> Route.seal(routes(creds)) ~> check { deleteAction(action.docid) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/LimitsApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/LimitsApiTests.scala index 39b8ca74ab7..2d669298012 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/LimitsApiTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/LimitsApiTests.scala @@ -23,7 +23,7 @@ import akka.http.scaladsl.model.StatusCodes.{BadRequest, MethodNotAllowed, OK} import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonUnmarshaller import akka.http.scaladsl.server.Route import org.apache.openwhisk.core.controller.WhiskLimitsApi -import org.apache.openwhisk.core.entity.{ConcurrencyLimit, EntityPath, LogLimit, MemoryLimit, TimeLimit, UserLimits} +import org.apache.openwhisk.core.entity.{IntraConcurrencyLimit, EntityPath, LogLimit, MemoryLimit, TimeLimit, UserLimits} import org.apache.openwhisk.core.entity.size._ import scala.concurrent.duration._ @@ -58,8 +58,8 @@ class LimitsApiTests extends ControllerTestCommon with WhiskLimitsApi { val testLogMax = LogLimit(6.MB) val testDurationMax = TimeLimit(20.seconds) val testDurationMin = TimeLimit(10.seconds) - val testConcurrencyMax = ConcurrencyLimit(20) - val testConcurrencyMin = ConcurrencyLimit(10) + val testConcurrencyMax = IntraConcurrencyLimit(20) + val testConcurrencyMin = IntraConcurrencyLimit(10) val creds = WhiskAuthHelpers.newIdentity() val credsWithSetLimits = WhiskAuthHelpers @@ -91,6 +91,8 @@ class LimitsApiTests extends ControllerTestCommon with WhiskLimitsApi { responseAs[UserLimits].invocationsPerMinute shouldBe Some(whiskConfig.actionInvokePerMinuteLimit.toInt) responseAs[UserLimits].concurrentInvocations shouldBe Some(whiskConfig.actionInvokeConcurrentLimit.toInt) responseAs[UserLimits].firesPerMinute shouldBe Some(whiskConfig.triggerFirePerMinuteLimit.toInt) + responseAs[UserLimits].maxActionInstances shouldBe Some(whiskConfig.actionInvokeConcurrentLimit.toInt) + responseAs[UserLimits].allowedKinds shouldBe None responseAs[UserLimits].storeActivations shouldBe None @@ -101,8 +103,8 @@ class LimitsApiTests extends ControllerTestCommon with WhiskLimitsApi { responseAs[UserLimits].maxActionLogs.get.megabytes shouldBe LogLimit.MAX_LOGSIZE_DEFAULT.toMB responseAs[UserLimits].minActionTimeout.get.duration shouldBe TimeLimit.MIN_DURATION_DEFAULT responseAs[UserLimits].maxActionTimeout.get.duration shouldBe TimeLimit.MAX_DURATION_DEFAULT - responseAs[UserLimits].minActionConcurrency.get.maxConcurrent shouldBe ConcurrencyLimit.MIN_CONCURRENT_DEFAULT - responseAs[UserLimits].maxActionConcurrency.get.maxConcurrent shouldBe ConcurrencyLimit.MAX_CONCURRENT_DEFAULT + responseAs[UserLimits].minActionConcurrency.get.maxConcurrent shouldBe IntraConcurrencyLimit.MIN_CONCURRENT_DEFAULT + responseAs[UserLimits].maxActionConcurrency.get.maxConcurrent shouldBe IntraConcurrencyLimit.MAX_CONCURRENT_DEFAULT } } } @@ -117,6 +119,7 @@ class LimitsApiTests extends ControllerTestCommon with WhiskLimitsApi { responseAs[UserLimits].firesPerMinute shouldBe Some(testFiresPerMinute) responseAs[UserLimits].allowedKinds shouldBe Some(testAllowedKinds) responseAs[UserLimits].storeActivations shouldBe Some(testStoreActivations) + responseAs[UserLimits].maxActionInstances shouldBe Some(testConcurrent) // provide action limits for namespace responseAs[UserLimits].minActionMemory.get.megabytes shouldBe testMemoryMin.megabytes diff --git a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecHelpers.scala b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecHelpers.scala index 04ecd2a2717..6ada36a575c 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecHelpers.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecHelpers.scala @@ -124,5 +124,5 @@ trait ExecHelpers extends Matchers with WskActorSystem with StreamLogging { } protected def actionLimits(memory: ByteSize, concurrency: Int): ActionLimits = - ActionLimits(memory = MemoryLimit(memory), concurrency = ConcurrencyLimit(concurrency)) + ActionLimits(memory = MemoryLimit(memory), concurrency = IntraConcurrencyLimit(concurrency)) } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/SchemaTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/SchemaTests.scala index e03989b9dd4..dc2c8ef4ba9 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/SchemaTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/SchemaTests.scala @@ -776,12 +776,12 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson, "memory" -> MemoryLimit.STD_MEMORY.toMB.toInt.toJson, "logs" -> LogLimit.STD_LOGSIZE.toMB.toInt.toJson, - "concurrency" -> ConcurrencyLimit.STD_CONCURRENT.toInt.toJson), + "concurrency" -> IntraConcurrencyLimit.STD_CONCURRENT.toInt.toJson), JsObject( "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson, "memory" -> MemoryLimit.STD_MEMORY.toMB.toInt.toJson, "logs" -> LogLimit.STD_LOGSIZE.toMB.toInt.toJson, - "concurrency" -> ConcurrencyLimit.STD_CONCURRENT.toInt.toJson, + "concurrency" -> IntraConcurrencyLimit.STD_CONCURRENT.toInt.toJson, "foo" -> "bar".toJson), JsObject( "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson, diff --git a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala index 989096e55ef..e35b9ec0dcc 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala @@ -43,7 +43,7 @@ import org.apache.openwhisk.core.entity.{ ActivationEntityLimit, ActivationResponse, ByteSize, - ConcurrencyLimit, + IntraConcurrencyLimit, Exec, LogLimit, MemoryLimit, @@ -126,11 +126,11 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers with WskActorSys } val toConcurrencyString = concurrency match { case None => "None" - case Some(ConcurrencyLimit.MIN_CONCURRENT) => s"${ConcurrencyLimit.MIN_CONCURRENT} (= min)" - case Some(ConcurrencyLimit.STD_CONCURRENT) => s"${ConcurrencyLimit.STD_CONCURRENT} (= std)" - case Some(ConcurrencyLimit.MAX_CONCURRENT) => s"${ConcurrencyLimit.MAX_CONCURRENT} (= max)" - case Some(c) if (c < ConcurrencyLimit.MIN_CONCURRENT) => s"${c} (< min)" - case Some(c) if (c > ConcurrencyLimit.MAX_CONCURRENT) => s"${c} (> max)" + case Some(IntraConcurrencyLimit.MIN_CONCURRENT) => s"${IntraConcurrencyLimit.MIN_CONCURRENT} (= min)" + case Some(IntraConcurrencyLimit.STD_CONCURRENT) => s"${IntraConcurrencyLimit.STD_CONCURRENT} (= std)" + case Some(IntraConcurrencyLimit.MAX_CONCURRENT) => s"${IntraConcurrencyLimit.MAX_CONCURRENT} (= max)" + case Some(c) if (c < IntraConcurrencyLimit.MIN_CONCURRENT) => s"${c} (< min)" + case Some(c) if (c > IntraConcurrencyLimit.MAX_CONCURRENT) => s"${c} (> max)" case Some(c) => s"${c} (allowed)" } val toExpectedResultString: String = if (ec == SUCCESS_EXIT) "allow" else "reject" @@ -143,10 +143,10 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers with WskActorSys time <- Seq(None, Some(TimeLimit.MIN_DURATION), Some(TimeLimit.MAX_DURATION)) mem <- Seq(None, Some(MemoryLimit.MIN_MEMORY), Some(MemoryLimit.MAX_MEMORY)) log <- Seq(None, Some(LogLimit.MIN_LOGSIZE), Some(LogLimit.MAX_LOGSIZE)) - concurrency <- if (!concurrencyEnabled || (ConcurrencyLimit.MIN_CONCURRENT == ConcurrencyLimit.MAX_CONCURRENT)) { - Seq(None, Some(ConcurrencyLimit.MIN_CONCURRENT)) + concurrency <- if (!concurrencyEnabled || (IntraConcurrencyLimit.MIN_CONCURRENT == IntraConcurrencyLimit.MAX_CONCURRENT)) { + Seq(None, Some(IntraConcurrencyLimit.MIN_CONCURRENT)) } else { - Seq(None, Some(ConcurrencyLimit.MIN_CONCURRENT), Some(ConcurrencyLimit.MAX_CONCURRENT)) + Seq(None, Some(IntraConcurrencyLimit.MIN_CONCURRENT), Some(IntraConcurrencyLimit.MAX_CONCURRENT)) } } yield PermutationTestParameter(time, mem, log, concurrency) } ++ @@ -181,7 +181,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers with WskActorSys "timeout" -> parm.timeout.getOrElse(TimeLimit.STD_DURATION).toMillis.toJson, "memory" -> parm.memory.getOrElse(MemoryLimit.STD_MEMORY).toMB.toInt.toJson, "logs" -> parm.logs.getOrElse(LogLimit.STD_LOGSIZE).toMB.toInt.toJson, - "concurrency" -> parm.concurrency.getOrElse(ConcurrencyLimit.STD_CONCURRENT).toJson) + "concurrency" -> parm.concurrency.getOrElse(IntraConcurrencyLimit.STD_CONCURRENT).toJson) val name = "ActionLimitTests-" + Instant.now.toEpochMilli val createResult = assetHelper.withCleaner(wsk.action, name, confirmDelete = (parm.ec == SUCCESS_EXIT)) { diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala index 9a235a99094..6bcc3d21d2a 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala @@ -313,7 +313,7 @@ class MemoryQueueFlowTests // this is the case where there is no capacity in a namespace and no container can be created. decisionMaker.setAutoPilot((sender: ActorRef, msg) => { msg match { - case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, Running, _) => + case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, _, Running, _) => sender ! DecisionResults(EnableNamespaceThrottling(true), 0) TestActor.KeepRunning diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala index c2c5ae34003..3d7d2fc7eee 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala @@ -1534,7 +1534,7 @@ class MemoryQueueTests // This test pilot mimic the decision maker who disable the namespace throttling when there is enough capacity. decisionMaker.setAutoPilot((sender: ActorRef, msg) => { msg match { - case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, NamespaceThrottled, _) => + case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, _, NamespaceThrottled, _) => sender ! DisableNamespaceThrottling case _ => diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala index edd0783aaa0..386523b1e9d 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala @@ -63,7 +63,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 0, inProgressContainerCountInNamespace = 0, averageDuration = None, - limit = 0, // limit is 0, + namespaceLimit = 0, + actionLimit = 0, // limit is 0, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -87,7 +88,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 0, inProgressContainerCountInNamespace = 0, averageDuration = None, - limit = 0, // limit is 0 + namespaceLimit = 0, + actionLimit = 0, // limit is 0 maxActionConcurrency = 1, stateName = Flushing, recipient = testProbe.ref) @@ -115,7 +117,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 2, averageDuration = None, // No average duration available - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = state, recipient = testProbe.ref) @@ -142,7 +145,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 5, inProgressContainerCountInNamespace = 8, averageDuration = Some(1.0), // Some average duration available - limit = 20, + namespaceLimit = 20, + actionLimit = 20, maxActionConcurrency = 1, stateName = state, recipient = testProbe.ref) @@ -167,7 +171,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 2, + namespaceLimit = 2, + actionLimit = 2, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -192,7 +197,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 2, // but there are already 2 containers in this namespace inProgressContainerCountInNamespace = 2, // this value includes the count of this action as well. averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -220,7 +226,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 2, + namespaceLimit = 2, + actionLimit = 2, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -248,7 +255,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 2, + namespaceLimit = 2, + actionLimit = 2, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -276,7 +284,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 2, + namespaceLimit = 2, + actionLimit = 2, maxActionConcurrency = 1, stateName = NamespaceThrottled, recipient = testProbe.ref) @@ -304,7 +313,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 2, + namespaceLimit = 2, + actionLimit = 2, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -332,7 +342,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 2, // but there are already 2 containers in this namespace inProgressContainerCountInNamespace = 2, // this value includes the count of this action as well. averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -357,7 +368,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -381,7 +393,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = NamespaceThrottled, recipient = testProbe.ref) @@ -405,7 +418,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = NamespaceThrottled, recipient = testProbe.ref) @@ -430,7 +444,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -455,7 +470,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -480,7 +496,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Flushing, recipient = testProbe.ref) @@ -504,7 +521,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Flushing, recipient = testProbe.ref) @@ -528,7 +546,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -552,7 +571,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -576,7 +596,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = None, - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -600,7 +621,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 6, averageDuration = None, - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -624,7 +646,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 0, averageDuration = Some(50), // the average duration exists - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -650,7 +673,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -676,7 +700,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -704,7 +729,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -731,7 +757,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -759,7 +786,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 5, inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -786,7 +814,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 0, inProgressContainerCountInNamespace = 0, averageDuration = None, // the average duration exists - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -810,7 +839,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 1, averageDuration = Some(1000), // the average duration exists - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = Removing, recipient = testProbe.ref) @@ -837,7 +867,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 2, averageDuration = None, // the average duration does not exist - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = Removing, recipient = testProbe.ref) @@ -864,7 +895,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 2, inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -889,7 +921,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 2, inProgressContainerCountInNamespace = 0, averageDuration = Some(50), // the average duration gives container throughput of 2 - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -914,7 +947,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 1, inProgressContainerCountInNamespace = 2, averageDuration = None, // the average duration does not exist - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 1, stateName = Removing, recipient = testProbe.ref) @@ -943,7 +977,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 2, inProgressContainerCountInNamespace = 0, averageDuration = Some(100.0), - limit = 4, + namespaceLimit = 4, + actionLimit = 4, maxActionConcurrency = 2, stateName = Running, recipient = testProbe.ref) @@ -970,7 +1005,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 2, inProgressContainerCountInNamespace = 0, averageDuration = Some(50.0), - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 3, stateName = Running, recipient = testProbe.ref) @@ -997,7 +1033,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 2, inProgressContainerCountInNamespace = 0, averageDuration = Some(50.0), - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 3, stateName = Running, recipient = testProbe.ref) @@ -1025,7 +1062,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 2, inProgressContainerCountInNamespace = 0, averageDuration = None, - limit = 10, + namespaceLimit = 10, + actionLimit = 10, maxActionConcurrency = 3, stateName = Running, recipient = testProbe.ref) @@ -1036,4 +1074,60 @@ class SchedulingDecisionMakerTests // 10 / 3 = 4.0 testProbe.expectMsg(DecisionResults(AddContainer, 4)) } + + it should "add only up to the action container limit if less than the namespace limit" in { + val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) + val testProbe = TestProbe() + + // container + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 100, + existingContainerCount = 1, + inProgressContainerCount = 0, + staleActivationNum = 0, + existingContainerCountInNamespace = 2, + inProgressContainerCountInNamespace = 0, + averageDuration = Some(100.0), + namespaceLimit = 10, + actionLimit = 5, + maxActionConcurrency = 3, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + // one container already exists with an action limit of 5. Number of messages will exceed limit of containers + // so use smaller of the two limits + testProbe.expectMsg(DecisionResults(AddContainer, 4)) + } + + it should "add only up to the namespace limit total if existing containers in namespace prevents reaching action limit" in { + val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) + val testProbe = TestProbe() + + // container + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 100, + existingContainerCount = 1, + inProgressContainerCount = 0, + staleActivationNum = 0, + existingContainerCountInNamespace = 7, + inProgressContainerCountInNamespace = 0, + averageDuration = Some(100.0), + namespaceLimit = 10, + actionLimit = 5, + maxActionConcurrency = 3, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + // one container already exists with an action limit of 5. There are currently 7 containers in namespace + // so can only add 3 more even if that only gives this action 4 containers when it has an action limit of 5 + testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(false), 3)) + } }