Skip to content

User Defined Action Instance Concurrency Limits #5287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None,
maxActionLogs: Option[LogLimit] = None,
minActionTimeout: Option[TimeLimit] = None,
maxActionTimeout: Option[TimeLimit] = None,
minActionConcurrency: Option[ConcurrencyLimit] = None,
maxActionConcurrency: Option[ConcurrencyLimit] = None,
minActionConcurrency: Option[IntraConcurrencyLimit] = None,
maxActionConcurrency: Option[IntraConcurrencyLimit] = None,
maxParameterSize: Option[ByteSize] = None,
maxPayloadSize: Option[ByteSize] = None,
truncationSize: Option[ByteSize] = None,
warmedContainerKeepingCount: Option[Int] = None,
warmedContainerKeepingTimeout: Option[String] = None) {
warmedContainerKeepingTimeout: Option[String] = None,
maxActionInstances: Option[Int] = None) {

def allowedMaxParameterSize: ByteSize = {
val namespaceLimit = maxParameterSize getOrElse (Parameters.MAX_SIZE_DEFAULT)
Expand All @@ -73,16 +74,16 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None,
}

def allowedMaxActionConcurrency: Int = {
val namespaceLimit = maxActionConcurrency.map(_.maxConcurrent) getOrElse (ConcurrencyLimit.MAX_CONCURRENT_DEFAULT)
if (namespaceLimit > ConcurrencyLimit.MAX_CONCURRENT) {
ConcurrencyLimit.MAX_CONCURRENT
val namespaceLimit = maxActionConcurrency.map(_.maxConcurrent) getOrElse (IntraConcurrencyLimit.MAX_CONCURRENT_DEFAULT)
if (namespaceLimit > IntraConcurrencyLimit.MAX_CONCURRENT) {
IntraConcurrencyLimit.MAX_CONCURRENT
} else namespaceLimit
}

def allowedMinActionConcurrency: Int = {
val namespaceLimit = minActionConcurrency.map(_.maxConcurrent) getOrElse (ConcurrencyLimit.MIN_CONCURRENT_DEFAULT)
if (namespaceLimit < ConcurrencyLimit.MIN_CONCURRENT) {
ConcurrencyLimit.MIN_CONCURRENT
val namespaceLimit = minActionConcurrency.map(_.maxConcurrent) getOrElse (IntraConcurrencyLimit.MIN_CONCURRENT_DEFAULT)
if (namespaceLimit < IntraConcurrencyLimit.MIN_CONCURRENT) {
IntraConcurrencyLimit.MIN_CONCURRENT
} else namespaceLimit
}

Expand Down Expand Up @@ -127,13 +128,12 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None,
TimeLimit.MIN_DURATION
} else namespaceLimit
}

}

object UserLimits extends DefaultJsonProtocol {
val standardUserLimits = UserLimits()
private implicit val byteSizeSerdes = size.serdes
implicit val serdes = jsonFormat18(UserLimits.apply)
implicit val serdes = jsonFormat19(UserLimits.apply)
}

protected[core] case class Namespace(name: EntityName, uuid: UUID)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.entity

import org.apache.openwhisk.http.Messages

import scala.util.Failure
import scala.util.Success
import scala.util.Try
import spray.json._

/**
* InstanceConcurrencyLimit encapsulates max allowed container concurrency for an action within a given namespace.
* A user is given a max concurrency for their entire namespace, but this doesn't allow for any fairness across their actions
* during load spikes. This action limit allows a user to specify max container concurrency for a specific action within the
* constraints of their namespace limit. By default, this limit does not exist and therefore the namespace concurrency limit is used.
* The allowed range is thus [1, namespaceConcurrencyLimit]. If this config is not used by any actions, then the default behavior
* of openwhisk continues in which any action can use the entire concurrency limit of the namespace. The limit less than namespace
* limit check occurs at the api level.
*
* NOTE: This limit is only leveraged on openwhisk v2 with the scheduler service. If this limit is set on a deployment of openwhisk
* not using the scheduler service, the limit will do nothing.
*
*
* @param maxConcurrentInstances the max number of concurrent activations in a single container
*/
protected[entity] class InstanceConcurrencyLimit private(val maxConcurrentInstances: Int) extends AnyVal

protected[core] object InstanceConcurrencyLimit extends ArgNormalizer[InstanceConcurrencyLimit] {

/** These values are set once at the beginning. Dynamic configuration updates are not supported at the moment. */
protected[core] val MIN_INSTANCES_LIMIT: Int = 0

/**
* Creates ContainerConcurrencyLimit for limit, iff limit is within permissible range.
*
* @param maxConcurrenctInstances the limit, must be within permissible range
* @return ConcurrencyLimit with limit set
* @throws IllegalArgumentException if limit does not conform to requirements
*/
@throws[IllegalArgumentException]
protected[core] def apply(maxConcurrenctInstances: Int): InstanceConcurrencyLimit = {
require(
maxConcurrenctInstances >= MIN_INSTANCES_LIMIT,
Messages.belowMinAllowedActionInstanceConcurrency(MIN_INSTANCES_LIMIT))
new InstanceConcurrencyLimit(maxConcurrenctInstances)
}

override protected[core] implicit val serdes = new RootJsonFormat[InstanceConcurrencyLimit] {
def write(m: InstanceConcurrencyLimit) = JsNumber(m.maxConcurrentInstances)

def read(value: JsValue) = {
Try {
val JsNumber(c) = value
require(c.isWhole, "container concurrency limit must be whole number")

InstanceConcurrencyLimit(c.toInt)
} match {
case Success(limit) => limit
case Failure(e: IllegalArgumentException) => deserializationError(e.getMessage, e)
case Failure(e: Throwable) => deserializationError("container concurrency limit malformed", e)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import scala.util.Success
import scala.util.Try
import spray.json._

case class NamespaceConcurrencyLimitConfig(min: Int, max: Int)
case class ConcurrencyLimitConfig(min: Int, max: Int, std: Int)
case class NamespaceIntraConcurrencyLimitConfig(min: Int, max: Int)
case class IntraConcurrencyLimitConfig(min: Int, max: Int, std: Int)

/**
* ConcurrencyLimit encapsulates allowed concurrency in a single container for an action. The limit must be within a
* IntraConcurrencyLimit encapsulates allowed concurrency in a single container for an action. The limit must be within a
* permissible range (by default [1, 1]). This default range was chosen intentionally to reflect that concurrency
* is disabled by default.
*
Expand All @@ -42,7 +42,7 @@ case class ConcurrencyLimitConfig(min: Int, max: Int, std: Int)
*
* @param maxConcurrent the max number of concurrent activations in a single container
*/
protected[entity] class ConcurrencyLimit private (val maxConcurrent: Int) extends AnyVal {
protected[entity] class IntraConcurrencyLimit private(val maxConcurrent: Int) extends AnyVal {

/** It checks the namespace memory limit setting value */
@throws[ActionConcurrencyLimitException]
Expand All @@ -60,17 +60,17 @@ protected[entity] class ConcurrencyLimit private (val maxConcurrent: Int) extend
}
}

protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit] {
protected[core] object IntraConcurrencyLimit extends ArgNormalizer[IntraConcurrencyLimit] {
//since tests require override to the default config, load the "test" config, with fallbacks to default
val config = ConfigFactory.load().getConfig("test")
private val concurrencyConfig =
loadConfigWithFallbackOrThrow[ConcurrencyLimitConfig](config, ConfigKeys.concurrencyLimit)
loadConfigWithFallbackOrThrow[IntraConcurrencyLimitConfig](config, ConfigKeys.concurrencyLimit)
private val namespaceConcurrencyDefaultConfig = try {
loadConfigWithFallbackOrThrow[NamespaceConcurrencyLimitConfig](config, ConfigKeys.namespaceConcurrencyLimit)
loadConfigWithFallbackOrThrow[NamespaceIntraConcurrencyLimitConfig](config, ConfigKeys.namespaceConcurrencyLimit)
} catch {
case _: Throwable =>
// Supports backwards compatibility for openwhisk that do not use the namespace default limit
NamespaceConcurrencyLimitConfig(concurrencyConfig.min, concurrencyConfig.max)
NamespaceIntraConcurrencyLimitConfig(concurrencyConfig.min, concurrencyConfig.max)
}

/**
Expand All @@ -91,10 +91,10 @@ protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit]
require(MIN_CONCURRENT <= MIN_CONCURRENT_DEFAULT, "The system min limit must be less than the namespace min limit.")

/** A singleton ConcurrencyLimit with default value */
protected[core] val standardConcurrencyLimit = ConcurrencyLimit(STD_CONCURRENT)
protected[core] val standardConcurrencyLimit = IntraConcurrencyLimit(STD_CONCURRENT)

/** Gets ConcurrencyLimit with default value */
protected[core] def apply(): ConcurrencyLimit = standardConcurrencyLimit
protected[core] def apply(): IntraConcurrencyLimit = standardConcurrencyLimit

/**
* Creates ConcurrencyLimit for limit, iff limit is within permissible range.
Expand All @@ -104,19 +104,19 @@ protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit]
* @throws IllegalArgumentException if limit does not conform to requirements
*/
@throws[IllegalArgumentException]
protected[core] def apply(concurrency: Int): ConcurrencyLimit = {
new ConcurrencyLimit(concurrency)
protected[core] def apply(concurrency: Int): IntraConcurrencyLimit = {
new IntraConcurrencyLimit(concurrency)
}

override protected[core] implicit val serdes = new RootJsonFormat[ConcurrencyLimit] {
def write(m: ConcurrencyLimit) = JsNumber(m.maxConcurrent)
override protected[core] implicit val serdes = new RootJsonFormat[IntraConcurrencyLimit] {
def write(m: IntraConcurrencyLimit) = JsNumber(m.maxConcurrent)

def read(value: JsValue) = {
Try {
val JsNumber(c) = value
require(c.isWhole, "concurrency limit must be whole number")
require(c.isWhole, "intra concurrency limit must be whole number")

ConcurrencyLimit(c.toInt)
IntraConcurrencyLimit(c.toInt)
} match {
case Success(limit) => limit
case Failure(e: IllegalArgumentException) => deserializationError(e.getMessage, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ protected[entity] abstract class Limits {
* @param memory the memory limit in megabytes, assured to be non-null because it is a value
* @param logs the limit for logs written by the container and stored in the activation record, assured to be non-null because it is a value
* @param concurrency the limit on concurrently processed activations per container, assured to be non-null because it is a value
* @param instances the limit in which an action can scale up to within the confines of the namespace's concurrency limit
*/
protected[core] case class ActionLimits(timeout: TimeLimit = TimeLimit(),
memory: MemoryLimit = MemoryLimit(),
logs: LogLimit = LogLimit(),
concurrency: ConcurrencyLimit = ConcurrencyLimit())
concurrency: IntraConcurrencyLimit = IntraConcurrencyLimit(),
instances: Option[InstanceConcurrencyLimit] = None)
extends Limits {
override protected[entity] def toJson = ActionLimits.serdes.write(this)

Expand All @@ -73,19 +75,19 @@ protected[core] case class TriggerLimits protected[core] () extends Limits {
protected[core] object ActionLimits extends ArgNormalizer[ActionLimits] with DefaultJsonProtocol {

override protected[core] implicit val serdes = new RootJsonFormat[ActionLimits] {
val helper = jsonFormat4(ActionLimits.apply)
val helper = jsonFormat5(ActionLimits.apply)

def read(value: JsValue) = {
val obj = Try {
value.asJsObject.convertTo[Map[String, JsValue]]
} getOrElse deserializationError("no valid json object passed")

val time = TimeLimit.serdes.read(obj.get("timeout") getOrElse deserializationError("'timeout' is missing"))
val memory = MemoryLimit.serdes.read(obj.get("memory") getOrElse deserializationError("'memory' is missing"))
val logs = obj.get("logs") map { LogLimit.serdes.read(_) } getOrElse LogLimit()
val concurrency = obj.get("concurrency") map { ConcurrencyLimit.serdes.read(_) } getOrElse ConcurrencyLimit()

ActionLimits(time, memory, logs, concurrency)
val time = TimeLimit.serdes.read(obj.getOrElse("timeout", deserializationError("'timeout' is missing")))
val memory = MemoryLimit.serdes.read(obj.getOrElse("memory", deserializationError("'memory' is missing")))
val logs = obj.get("logs") map { LogLimit.serdes.read } getOrElse LogLimit()
val concurrency = obj.get("concurrency") map { IntraConcurrencyLimit.serdes.read } getOrElse IntraConcurrencyLimit()
val instances = obj.get("instances") map { InstanceConcurrencyLimit.serdes.read }
ActionLimits(time, memory, logs, concurrency, instances)
}

def write(a: ActionLimits) = helper.write(a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import org.apache.openwhisk.core.entity.types.EntityStore
case class ActionLimitsOption(timeout: Option[TimeLimit],
memory: Option[MemoryLimit],
logs: Option[LogLimit],
concurrency: Option[ConcurrencyLimit])
concurrency: Option[IntraConcurrencyLimit],
containerConcurrency: Option[InstanceConcurrencyLimit] = None)

/**
* WhiskActionPut is a restricted WhiskAction view that eschews properties
Expand Down Expand Up @@ -647,7 +648,7 @@ object WhiskActionMetaData
}

object ActionLimitsOption extends DefaultJsonProtocol {
implicit val serdes = jsonFormat4(ActionLimitsOption.apply)
implicit val serdes = jsonFormat5(ActionLimitsOption.apply)
}

object WhiskActionPut extends DefaultJsonProtocol {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ object Messages {
def tooManyConcurrentRequests(count: Int, allowed: Int) =
s"Too many concurrent requests in flight (count: $count, allowed: $allowed)."

def maxActionInstanceConcurrencyExceedsNamespace(namespaceConcurrencyLimit: Int) =
s"Max action instance concurrency must not exceed your namespace concurrency of $namespaceConcurrencyLimit."

def belowMinAllowedActionInstanceConcurrency(minThreshold: Int) =
s"Action container concurrency must be greater than or equal to $minThreshold."

/** System overload message. */
val systemOverloaded = "System is overloaded, try again later."

Expand Down
15 changes: 12 additions & 3 deletions core/controller/src/main/resources/apiv1swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1800,8 +1800,13 @@
"concurrency": {
"type": "integer",
"format": "int32",
"description": "number of concurrent activations allowed",
"description": "number of concurrent activations allowed within an instance",
"default": 1
},
"instances": {
"type": "integer",
"format": "int32",
"description": "Max number of instances allowed for an action. Must be less than or equal to namespace concurrency limit. Default is the namespace concurrency limit."
}
}
},
Expand Down Expand Up @@ -2868,11 +2873,15 @@
},
"minActionConcurrency": {
"type": "integer",
"description": "Min number of concurrent activations allowed"
"description": "Min number of concurrent activations within an instance allowed"
},
"maxActionConcurrency": {
"type": "integer",
"description": "Max number of concurrent activations allowed"
"description": "Max number of concurrent activations within an instance allowed"
},
"maxActionInstances": {
"type": "integer",
"description": "Max number of concurrent instances allowed for an action"
}
}
}
Expand Down
Loading