Skip to content

Commit 72bb2a1

Browse files
bdoyle0182Brendan Doyle
andauthored
User Defined Action Instance Concurrency Limits (#5287)
* working prototype * consider when to turn on namespace throttling * tests and final cleanup * update swagger * fix container concurrency field * fix tests * renaming * update docs * more cleanup --------- Co-authored-by: Brendan Doyle <[email protected]>
1 parent be8ac20 commit 72bb2a1

File tree

28 files changed

+488
-200
lines changed

28 files changed

+488
-200
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None,
4343
maxActionLogs: Option[LogLimit] = None,
4444
minActionTimeout: Option[TimeLimit] = None,
4545
maxActionTimeout: Option[TimeLimit] = None,
46-
minActionConcurrency: Option[ConcurrencyLimit] = None,
47-
maxActionConcurrency: Option[ConcurrencyLimit] = None,
46+
minActionConcurrency: Option[IntraConcurrencyLimit] = None,
47+
maxActionConcurrency: Option[IntraConcurrencyLimit] = None,
4848
maxParameterSize: Option[ByteSize] = None,
4949
maxPayloadSize: Option[ByteSize] = None,
5050
truncationSize: Option[ByteSize] = None,
5151
warmedContainerKeepingCount: Option[Int] = None,
52-
warmedContainerKeepingTimeout: Option[String] = None) {
52+
warmedContainerKeepingTimeout: Option[String] = None,
53+
maxActionInstances: Option[Int] = None) {
5354

5455
def allowedMaxParameterSize: ByteSize = {
5556
val namespaceLimit = maxParameterSize getOrElse (Parameters.MAX_SIZE_DEFAULT)
@@ -73,16 +74,16 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None,
7374
}
7475

7576
def allowedMaxActionConcurrency: Int = {
76-
val namespaceLimit = maxActionConcurrency.map(_.maxConcurrent) getOrElse (ConcurrencyLimit.MAX_CONCURRENT_DEFAULT)
77-
if (namespaceLimit > ConcurrencyLimit.MAX_CONCURRENT) {
78-
ConcurrencyLimit.MAX_CONCURRENT
77+
val namespaceLimit = maxActionConcurrency.map(_.maxConcurrent) getOrElse (IntraConcurrencyLimit.MAX_CONCURRENT_DEFAULT)
78+
if (namespaceLimit > IntraConcurrencyLimit.MAX_CONCURRENT) {
79+
IntraConcurrencyLimit.MAX_CONCURRENT
7980
} else namespaceLimit
8081
}
8182

8283
def allowedMinActionConcurrency: Int = {
83-
val namespaceLimit = minActionConcurrency.map(_.maxConcurrent) getOrElse (ConcurrencyLimit.MIN_CONCURRENT_DEFAULT)
84-
if (namespaceLimit < ConcurrencyLimit.MIN_CONCURRENT) {
85-
ConcurrencyLimit.MIN_CONCURRENT
84+
val namespaceLimit = minActionConcurrency.map(_.maxConcurrent) getOrElse (IntraConcurrencyLimit.MIN_CONCURRENT_DEFAULT)
85+
if (namespaceLimit < IntraConcurrencyLimit.MIN_CONCURRENT) {
86+
IntraConcurrencyLimit.MIN_CONCURRENT
8687
} else namespaceLimit
8788
}
8889

@@ -127,13 +128,12 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None,
127128
TimeLimit.MIN_DURATION
128129
} else namespaceLimit
129130
}
130-
131131
}
132132

133133
object UserLimits extends DefaultJsonProtocol {
134134
val standardUserLimits = UserLimits()
135135
private implicit val byteSizeSerdes = size.serdes
136-
implicit val serdes = jsonFormat18(UserLimits.apply)
136+
implicit val serdes = jsonFormat19(UserLimits.apply)
137137
}
138138

139139
protected[core] case class Namespace(name: EntityName, uuid: UUID)
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.entity
19+
20+
import org.apache.openwhisk.http.Messages
21+
22+
import scala.util.Failure
23+
import scala.util.Success
24+
import scala.util.Try
25+
import spray.json._
26+
27+
/**
28+
* InstanceConcurrencyLimit encapsulates max allowed container concurrency for an action within a given namespace.
29+
* A user is given a max concurrency for their entire namespace, but this doesn't allow for any fairness across their actions
30+
* during load spikes. This action limit allows a user to specify max container concurrency for a specific action within the
31+
* constraints of their namespace limit. By default, this limit does not exist and therefore the namespace concurrency limit is used.
32+
* The allowed range is thus [1, namespaceConcurrencyLimit]. If this config is not used by any actions, then the default behavior
33+
* of openwhisk continues in which any action can use the entire concurrency limit of the namespace. The limit less than namespace
34+
* limit check occurs at the api level.
35+
*
36+
* NOTE: This limit is only leveraged on openwhisk v2 with the scheduler service. If this limit is set on a deployment of openwhisk
37+
* not using the scheduler service, the limit will do nothing.
38+
*
39+
*
40+
* @param maxConcurrentInstances the max number of concurrent activations in a single container
41+
*/
42+
protected[entity] class InstanceConcurrencyLimit private(val maxConcurrentInstances: Int) extends AnyVal
43+
44+
protected[core] object InstanceConcurrencyLimit extends ArgNormalizer[InstanceConcurrencyLimit] {
45+
46+
/** These values are set once at the beginning. Dynamic configuration updates are not supported at the moment. */
47+
protected[core] val MIN_INSTANCES_LIMIT: Int = 0
48+
49+
/**
50+
* Creates ContainerConcurrencyLimit for limit, iff limit is within permissible range.
51+
*
52+
* @param maxConcurrenctInstances the limit, must be within permissible range
53+
* @return ConcurrencyLimit with limit set
54+
* @throws IllegalArgumentException if limit does not conform to requirements
55+
*/
56+
@throws[IllegalArgumentException]
57+
protected[core] def apply(maxConcurrenctInstances: Int): InstanceConcurrencyLimit = {
58+
require(
59+
maxConcurrenctInstances >= MIN_INSTANCES_LIMIT,
60+
Messages.belowMinAllowedActionInstanceConcurrency(MIN_INSTANCES_LIMIT))
61+
new InstanceConcurrencyLimit(maxConcurrenctInstances)
62+
}
63+
64+
override protected[core] implicit val serdes = new RootJsonFormat[InstanceConcurrencyLimit] {
65+
def write(m: InstanceConcurrencyLimit) = JsNumber(m.maxConcurrentInstances)
66+
67+
def read(value: JsValue) = {
68+
Try {
69+
val JsNumber(c) = value
70+
require(c.isWhole, "instance concurrency limit must be whole number")
71+
72+
InstanceConcurrencyLimit(c.toInt)
73+
} match {
74+
case Success(limit) => limit
75+
case Failure(e: IllegalArgumentException) => deserializationError(e.getMessage, e)
76+
case Failure(e: Throwable) => deserializationError("instance concurrency limit malformed", e)
77+
}
78+
}
79+
}
80+
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ConcurrencyLimit.scala renamed to common/scala/src/main/scala/org/apache/openwhisk/core/entity/IntraConcurrencyLimit.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ import scala.util.Success
2828
import scala.util.Try
2929
import spray.json._
3030

31-
case class NamespaceConcurrencyLimitConfig(min: Int, max: Int)
32-
case class ConcurrencyLimitConfig(min: Int, max: Int, std: Int)
31+
case class NamespaceIntraConcurrencyLimitConfig(min: Int, max: Int)
32+
case class IntraConcurrencyLimitConfig(min: Int, max: Int, std: Int)
3333

3434
/**
35-
* ConcurrencyLimit encapsulates allowed concurrency in a single container for an action. The limit must be within a
35+
* IntraConcurrencyLimit encapsulates allowed concurrency in a single container for an action. The limit must be within a
3636
* permissible range (by default [1, 1]). This default range was chosen intentionally to reflect that concurrency
3737
* is disabled by default.
3838
*
@@ -42,7 +42,7 @@ case class ConcurrencyLimitConfig(min: Int, max: Int, std: Int)
4242
*
4343
* @param maxConcurrent the max number of concurrent activations in a single container
4444
*/
45-
protected[entity] class ConcurrencyLimit private (val maxConcurrent: Int) extends AnyVal {
45+
protected[entity] class IntraConcurrencyLimit private(val maxConcurrent: Int) extends AnyVal {
4646

4747
/** It checks the namespace memory limit setting value */
4848
@throws[ActionConcurrencyLimitException]
@@ -60,17 +60,17 @@ protected[entity] class ConcurrencyLimit private (val maxConcurrent: Int) extend
6060
}
6161
}
6262

63-
protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit] {
63+
protected[core] object IntraConcurrencyLimit extends ArgNormalizer[IntraConcurrencyLimit] {
6464
//since tests require override to the default config, load the "test" config, with fallbacks to default
6565
val config = ConfigFactory.load().getConfig("test")
6666
private val concurrencyConfig =
67-
loadConfigWithFallbackOrThrow[ConcurrencyLimitConfig](config, ConfigKeys.concurrencyLimit)
67+
loadConfigWithFallbackOrThrow[IntraConcurrencyLimitConfig](config, ConfigKeys.concurrencyLimit)
6868
private val namespaceConcurrencyDefaultConfig = try {
69-
loadConfigWithFallbackOrThrow[NamespaceConcurrencyLimitConfig](config, ConfigKeys.namespaceConcurrencyLimit)
69+
loadConfigWithFallbackOrThrow[NamespaceIntraConcurrencyLimitConfig](config, ConfigKeys.namespaceConcurrencyLimit)
7070
} catch {
7171
case _: Throwable =>
7272
// Supports backwards compatibility for openwhisk that do not use the namespace default limit
73-
NamespaceConcurrencyLimitConfig(concurrencyConfig.min, concurrencyConfig.max)
73+
NamespaceIntraConcurrencyLimitConfig(concurrencyConfig.min, concurrencyConfig.max)
7474
}
7575

7676
/**
@@ -91,10 +91,10 @@ protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit]
9191
require(MIN_CONCURRENT <= MIN_CONCURRENT_DEFAULT, "The system min limit must be less than the namespace min limit.")
9292

9393
/** A singleton ConcurrencyLimit with default value */
94-
protected[core] val standardConcurrencyLimit = ConcurrencyLimit(STD_CONCURRENT)
94+
protected[core] val standardConcurrencyLimit = IntraConcurrencyLimit(STD_CONCURRENT)
9595

9696
/** Gets ConcurrencyLimit with default value */
97-
protected[core] def apply(): ConcurrencyLimit = standardConcurrencyLimit
97+
protected[core] def apply(): IntraConcurrencyLimit = standardConcurrencyLimit
9898

9999
/**
100100
* Creates ConcurrencyLimit for limit, iff limit is within permissible range.
@@ -104,19 +104,19 @@ protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit]
104104
* @throws IllegalArgumentException if limit does not conform to requirements
105105
*/
106106
@throws[IllegalArgumentException]
107-
protected[core] def apply(concurrency: Int): ConcurrencyLimit = {
108-
new ConcurrencyLimit(concurrency)
107+
protected[core] def apply(concurrency: Int): IntraConcurrencyLimit = {
108+
new IntraConcurrencyLimit(concurrency)
109109
}
110110

111-
override protected[core] implicit val serdes = new RootJsonFormat[ConcurrencyLimit] {
112-
def write(m: ConcurrencyLimit) = JsNumber(m.maxConcurrent)
111+
override protected[core] implicit val serdes = new RootJsonFormat[IntraConcurrencyLimit] {
112+
def write(m: IntraConcurrencyLimit) = JsNumber(m.maxConcurrent)
113113

114114
def read(value: JsValue) = {
115115
Try {
116116
val JsNumber(c) = value
117-
require(c.isWhole, "concurrency limit must be whole number")
117+
require(c.isWhole, "intra concurrency limit must be whole number")
118118

119-
ConcurrencyLimit(c.toInt)
119+
IntraConcurrencyLimit(c.toInt)
120120
} match {
121121
case Success(limit) => limit
122122
case Failure(e: IllegalArgumentException) => deserializationError(e.getMessage, e)

common/scala/src/main/scala/org/apache/openwhisk/core/entity/Limits.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,13 @@ protected[entity] abstract class Limits {
4646
* @param memory the memory limit in megabytes, assured to be non-null because it is a value
4747
* @param logs the limit for logs written by the container and stored in the activation record, assured to be non-null because it is a value
4848
* @param concurrency the limit on concurrently processed activations per container, assured to be non-null because it is a value
49+
* @param instances the limit in which an action can scale up to within the confines of the namespace's concurrency limit
4950
*/
5051
protected[core] case class ActionLimits(timeout: TimeLimit = TimeLimit(),
5152
memory: MemoryLimit = MemoryLimit(),
5253
logs: LogLimit = LogLimit(),
53-
concurrency: ConcurrencyLimit = ConcurrencyLimit())
54+
concurrency: IntraConcurrencyLimit = IntraConcurrencyLimit(),
55+
instances: Option[InstanceConcurrencyLimit] = None)
5456
extends Limits {
5557
override protected[entity] def toJson = ActionLimits.serdes.write(this)
5658

@@ -73,19 +75,19 @@ protected[core] case class TriggerLimits protected[core] () extends Limits {
7375
protected[core] object ActionLimits extends ArgNormalizer[ActionLimits] with DefaultJsonProtocol {
7476

7577
override protected[core] implicit val serdes = new RootJsonFormat[ActionLimits] {
76-
val helper = jsonFormat4(ActionLimits.apply)
78+
val helper = jsonFormat5(ActionLimits.apply)
7779

7880
def read(value: JsValue) = {
7981
val obj = Try {
8082
value.asJsObject.convertTo[Map[String, JsValue]]
8183
} getOrElse deserializationError("no valid json object passed")
8284

83-
val time = TimeLimit.serdes.read(obj.get("timeout") getOrElse deserializationError("'timeout' is missing"))
84-
val memory = MemoryLimit.serdes.read(obj.get("memory") getOrElse deserializationError("'memory' is missing"))
85-
val logs = obj.get("logs") map { LogLimit.serdes.read(_) } getOrElse LogLimit()
86-
val concurrency = obj.get("concurrency") map { ConcurrencyLimit.serdes.read(_) } getOrElse ConcurrencyLimit()
87-
88-
ActionLimits(time, memory, logs, concurrency)
85+
val time = TimeLimit.serdes.read(obj.getOrElse("timeout", deserializationError("'timeout' is missing")))
86+
val memory = MemoryLimit.serdes.read(obj.getOrElse("memory", deserializationError("'memory' is missing")))
87+
val logs = obj.get("logs") map { LogLimit.serdes.read } getOrElse LogLimit()
88+
val concurrency = obj.get("concurrency") map { IntraConcurrencyLimit.serdes.read } getOrElse IntraConcurrencyLimit()
89+
val instances = obj.get("instances") map { InstanceConcurrencyLimit.serdes.read }
90+
ActionLimits(time, memory, logs, concurrency, instances)
8991
}
9092

9193
def write(a: ActionLimits) = helper.write(a)

common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ import org.apache.openwhisk.core.entity.types.EntityStore
4040
case class ActionLimitsOption(timeout: Option[TimeLimit],
4141
memory: Option[MemoryLimit],
4242
logs: Option[LogLimit],
43-
concurrency: Option[ConcurrencyLimit])
43+
concurrency: Option[IntraConcurrencyLimit],
44+
instances: Option[InstanceConcurrencyLimit] = None)
4445

4546
/**
4647
* WhiskActionPut is a restricted WhiskAction view that eschews properties
@@ -647,7 +648,7 @@ object WhiskActionMetaData
647648
}
648649

649650
object ActionLimitsOption extends DefaultJsonProtocol {
650-
implicit val serdes = jsonFormat4(ActionLimitsOption.apply)
651+
implicit val serdes = jsonFormat5(ActionLimitsOption.apply)
651652
}
652653

653654
object WhiskActionPut extends DefaultJsonProtocol {

common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ object Messages {
7272
def tooManyConcurrentRequests(count: Int, allowed: Int) =
7373
s"Too many concurrent requests in flight (count: $count, allowed: $allowed)."
7474

75+
def maxActionInstanceConcurrencyExceedsNamespace(namespaceConcurrencyLimit: Int) =
76+
s"Max action instance concurrency must not exceed your namespace concurrency of $namespaceConcurrencyLimit."
77+
78+
def belowMinAllowedActionInstanceConcurrency(minThreshold: Int) =
79+
s"Action container concurrency must be greater than or equal to $minThreshold."
80+
7581
/** System overload message. */
7682
val systemOverloaded = "System is overloaded, try again later."
7783

core/controller/src/main/resources/apiv1swagger.json

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1800,8 +1800,13 @@
18001800
"concurrency": {
18011801
"type": "integer",
18021802
"format": "int32",
1803-
"description": "number of concurrent activations allowed",
1803+
"description": "number of concurrent activations allowed within an instance",
18041804
"default": 1
1805+
},
1806+
"instances": {
1807+
"type": "integer",
1808+
"format": "int32",
1809+
"description": "Max number of instances allowed for an action. Must be less than or equal to namespace concurrency limit. Default is the namespace concurrency limit."
18051810
}
18061811
}
18071812
},
@@ -2873,11 +2878,15 @@
28732878
},
28742879
"minActionConcurrency": {
28752880
"type": "integer",
2876-
"description": "Min number of concurrent activations allowed"
2881+
"description": "Min number of concurrent activations within an instance allowed"
28772882
},
28782883
"maxActionConcurrency": {
28792884
"type": "integer",
2880-
"description": "Max number of concurrent activations allowed"
2885+
"description": "Max number of concurrent activations within an instance allowed"
2886+
},
2887+
"maxActionInstances": {
2888+
"type": "integer",
2889+
"description": "Max number of concurrent instances allowed for an action"
28812890
}
28822891
}
28832892
}

0 commit comments

Comments
 (0)