Skip to content

Provide action limit configuration for each namespace #5229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c33b6ee
Provide action limit config for namespace
upgle Apr 27, 2022
ae5540a
Check namespace default limit
upgle May 3, 2022
8c01164
Validate system limits and namespace default limits
upgle May 3, 2022
a514a1c
Fix test code
upgle May 3, 2022
1ac6434
Add system limit test
upgle May 3, 2022
a30295f
Refactor code
upgle May 4, 2022
dca6ade
Reject message in invoker reactive
upgle May 4, 2022
3f9f983
Change config key
upgle May 4, 2022
2e19c9d
Refactor code
upgle May 4, 2022
7b81826
Add ansible config
upgle May 8, 2022
83b4258
Update annotation for maxConcurrent
upgle May 9, 2022
c3c0715
Add test case for limit api
upgle May 9, 2022
1e18a31
Rename limit config key
upgle May 9, 2022
cab437b
Update swagger
upgle May 9, 2022
f1446ad
Update document
upgle May 9, 2022
227d431
Add parameter size limit
upgle May 10, 2022
0670a80
Refactor code
upgle May 10, 2022
e8ce495
Update limits API
upgle May 10, 2022
cbddbb6
Rename allowedDuration -> allowedActionDuration
upgle May 10, 2022
e438be1
Add test case for parameter limit
upgle May 10, 2022
b88a8b9
Refactor code
upgle May 10, 2022
f8ca8a0
Add request payload limit for namespace
upgle May 11, 2022
467ac5a
Check activation result size with namespace payload limit
upgle May 11, 2022
4ad15fa
Provide truncation size option for namespace
upgle May 11, 2022
689d10a
Support scheduler
upgle May 13, 2022
703f3e1
Supports backwards compatibility for new limit config
upgle May 13, 2022
0dd5202
Update wskadmin
upgle May 28, 2022
9dc2037
Add parameter annotation for truncation
upgle Jul 25, 2022
00dc2fb
Fix test code for KubernetesContainerTests
upgle Jul 25, 2022
94a4634
Fix test code for DockerContainerTests
upgle Jul 25, 2022
3c63ea8
Fix test cases
upgle Jul 25, 2022
66ba50b
Fix build error
upgle Aug 2, 2022
28ade66
Fix build error
upgle Aug 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ansible/environments/jenkins/group_vars/openwhisk-vm1-he-de
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,7 @@ container_pool_akka_client: true
runtimes_enable_concurrency: true
limit_action_concurrency_max: 500

namespace_default_limit_action_concurrency_max: 500

invoker1_machine: openwhisk-vm3-he-de
invoker_use_runc: false
2 changes: 2 additions & 0 deletions ansible/environments/jenkins/group_vars/openwhisk-vm2-he-de
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,7 @@ runtimes_enable_concurrency: true
limit_action_concurrency_max: 500
limit_invocations_per_minute: 120

namespace_default_limit_action_concurrency_max: 500

invoker1_machine: openwhisk-vm1-he-de
invoker_use_runc: false
2 changes: 2 additions & 0 deletions ansible/environments/jenkins/group_vars/openwhisk-vm3-he-de
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,7 @@ container_pool_akka_client: true
runtimes_enable_concurrency: true
limit_action_concurrency_max: 500

namespace_default_limit_action_concurrency_max: 500

invoker1_machine: openwhisk-vm2-he-de
invoker_use_runc: false
1 change: 1 addition & 0 deletions ansible/environments/local/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ env_hosts_dir: "{{ playbook_dir }}/environments/local"
container_pool_akka_client: true
runtimes_enable_concurrency: true
limit_action_concurrency_max: 500
namespace_default_limit_action_concurrency_max: 500
9 changes: 9 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@
"CONFIG_whisk_concurrencyLimit_max": "{{ limit_action_concurrency_max | default() }}"
"CONFIG_whisk_concurrencyLimit_std": "{{ limit_action_concurrency_std | default() }}"

"CONFIG_whisk_namespaceDefaultLimit_memory_min": "{{ namespace_default_limit_action_memory_min | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_memory_max": "{{ namespace_default_limit_action_memory_max | default() }}"

"CONFIG_whisk_namespaceDefaultLimit_timeLimit_min": "{{ namespace_default_limit_action_time_min | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_timeLimit_max": "{{ namespace_default_limit_action_time_max | default() }}"

"CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_min": "{{ namespace_default_limit_action_concurrency_min | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_max": "{{ namespace_default_limit_action_concurrency_max | default() }}"

"CONFIG_whisk_featureFlags_requireApiKeyAnnotation": "{{ whisk.feature_flags.require_api_key_annotation | default(true) | lower }}"
"CONFIG_whisk_featureFlags_requireResponsePayload": "{{ whisk.feature_flags.require_response_payload | default(true) | lower }}"

Expand Down
6 changes: 6 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@
"CONFIG_whisk_concurrencyLimit_min": "{{ limit_action_concurrency_min | default() }}"
"CONFIG_whisk_concurrencyLimit_max": "{{ limit_action_concurrency_max | default() }}"
"CONFIG_whisk_concurrencyLimit_std": "{{ limit_action_concurrency_std | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_memory_min": "{{ namespace_default_limit_action_memory_min | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_memory_max": "{{ namespace_default_limit_action_memory_max | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_timeLimit_min": "{{ namespace_default_limit_action_time_min | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_timeLimit_max": "{{ namespace_default_limit_action_time_max | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_min": "{{ namespace_default_limit_action_concurrency_min | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_max": "{{ namespace_default_limit_action_concurrency_max | default() }}"
"CONFIG_whisk_activation_payload_max": "{{ limit_activation_payload | default() }}"
"CONFIG_whisk_transactions_header": "{{ transactions.header }}"
"CONFIG_whisk_containerPool_akkaClient": "{{ container_pool_akka_client | default('false') | lower }}"
Expand Down
9 changes: 9 additions & 0 deletions ansible/roles/schedulers/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@
"CONFIG_whisk_concurrencyLimit_max": "{{ limit_action_concurrency_max | default() }}"
"CONFIG_whisk_concurrencyLimit_std": "{{ limit_action_concurrency_std | default() }}"

"CONFIG_whisk_namespaceDefaultLimit_memory_min": "{{ namespace_default_limit_action_memory_min | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_memory_max": "{{ namespace_default_limit_action_memory_max | default() }}"

"CONFIG_whisk_namespaceDefaultLimit_timeLimit_min": "{{ namespace_default_limit_action_time_min | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_timeLimit_max": "{{ namespace_default_limit_action_time_max | default() }}"

"CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_min": "{{ namespace_default_limit_action_concurrency_min | default() }}"
"CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_max": "{{ namespace_default_limit_action_concurrency_max | default() }}"

"RUNTIMES_MANIFEST": "{{ runtimesManifest | to_json }}"
"CONFIG_whisk_runtimes_defaultImagePrefix":
"{{ runtimes_default_image_prefix | default() }}"
Expand Down
31 changes: 31 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ whisk {
std = 1
}

# maximum size of the action parameter
parameter-size-limit = 1 m

# maximum size of the action code
exec-size-limit = 48 m

Expand All @@ -517,6 +520,34 @@ whisk {
default-list-limit = 30 # default limit on number of entities returned from a collection on a list operation
}

# default namespace limit settings
# Disabled for backwards compatibility. If you want to use it, either uncomment it or add the setting at deployment time.
# namespace-default-limit {
# memory {
# min = 128 m
# max = 512 m
# }
# time-limit {
# min = 100 ms
# max = 5 m
# }
# log-limit {
# min = 0 m
# max = 10 m
# }
# concurrency-limit {
# min = 1
# max = 1
# }
# parameter-size-limit = 1 m
# activation {
# payload {
# max = 1 m
# truncation = 1 m
# }
# }
# }

yarn {
master-url="http://localhost:8088" //YARN Resource Manager endpoint to be accessed from the invoker
yarn-link-log-message=true //If true, display a link to YARN in the static log message, otherwise do not include a link to YARN.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ object ConfigKeys {
val timeLimit = "whisk.time-limit"
val logLimit = "whisk.log-limit"
val concurrencyLimit = "whisk.concurrency-limit"
val parameterSizeLimit = "whisk.parameter-size-limit"

val namespaceMemoryLimit = "whisk.namespace-default-limit.memory"
val namespaceTimeLimit = "whisk.namespace-default-limit.time-limit"
val namespaceLogLimit = "whisk.namespace-default-limit.log-limit"
val namespaceConcurrencyLimit = "whisk.namespace-default-limit.concurrency-limit"
val namespaceParameterSizeLimit = "whisk.namespace-default-limit.parameter-size-limit"
val namespaceActivationPayloadLimit = "whisk.namespace-default-limit.activation.payload"

val activation = "whisk.activation"
val userEvents = "whisk.user-events"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,8 @@ object ContainerCreationError extends Enumeration {

case object TooManyConcurrentRequests extends ContainerCreationError

case object InvalidActionLimitError extends ContainerCreationError

val whiskErrors: Set[ContainerCreationError] =
Set(
NoAvailableInvokersError,
Expand All @@ -684,6 +686,7 @@ object ContainerCreationError extends Enumeration {
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
case "UNKNOWNERROR" => UnknownError
case "INVALIDACTIONLIMITERROR" => InvalidActionLimitError
}

implicit val serds = new RootJsonFormat[ContainerCreationError] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ import org.apache.openwhisk.common.MetricEmitter
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.entity.ActivationResponse.ContainerHttpError
import org.apache.openwhisk.core.entity.ActivationResponse._
import org.apache.openwhisk.core.entity.ByteSize
import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
import org.apache.openwhisk.core.entity.size.SizeLong
import org.apache.openwhisk.http.PoolingRestClient

import java.time.Instant

/**
Expand All @@ -61,16 +62,13 @@ import java.time.Instant
* @param hostname the host name
* @param port the port
* @param timeout the timeout in msecs to wait for a response
* @param maxResponse the maximum size in bytes the connection will accept
* @param queueSize once all connections are used, how big of queue to allow for additional requests
* @param retryInterval duration between retries for TCP connection errors
*/
protected class AkkaContainerClient(
hostname: String,
port: Int,
timeout: FiniteDuration,
maxResponse: ByteSize,
truncation: ByteSize,
queueSize: Int,
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
Expand All @@ -87,12 +85,19 @@ protected class AkkaContainerClient(
*
* @param endpoint the path the api call relative to hostname
* @param body the JSON value to post (this is usually a JSON objecT)
* @param maxResponse the maximum size in bytes the connection will accept
* @param truncation the truncation size in bytes
* @param retry whether or not to retry on connection failure
* @param reschedule whether or not to throw ContainerHealthError (triggers reschedule) on connection failure
* @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
*/
def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean = false)(
implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
def post(
endpoint: String,
body: JsValue,
maxResponse: ByteSize,
truncation: ByteSize,
retry: Boolean,
reschedule: Boolean = false)(implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {

//create the request
val req = Marshal(body).to[MessageEntity].map { b =>
Expand All @@ -115,7 +120,7 @@ protected class AkkaContainerClient(
Right(ContainerResponse(response.status.intValue, o, None))
}
} else {
truncated(response.entity.dataBytes).map { s =>
truncated(truncation, response.entity.dataBytes).map { s =>
Right(ContainerResponse(response.status.intValue, s, Some(contentLength.B, maxResponse)))
}
}
Expand Down Expand Up @@ -167,7 +172,8 @@ protected class AkkaContainerClient(
}
}

private def truncated(responseBytes: Source[ByteString, _],
private def truncated(truncation: ByteSize,
responseBytes: Source[ByteString, _],
previouslyCaptured: ByteString = ByteString.empty): Future[String] = {
responseBytes.prefixAndTail(1).runWith(Sink.head).flatMap {
case (Nil, tail) =>
Expand All @@ -176,7 +182,7 @@ protected class AkkaContainerClient(
case (Seq(prefix), tail) =>
val truncatedResponse = previouslyCaptured ++ prefix
if (truncatedResponse.size < truncation.toBytes) {
truncated(tail, truncatedResponse)
truncated(truncation, tail, truncatedResponse)
} else {
//ignore the tail (MUST CONSUME ENTIRE ENTITY!)
//captured string MAY be larger than the truncation size, so take only truncation bytes to get the exact length
Expand All @@ -194,7 +200,7 @@ object AkkaContainerClient {
as: ActorSystem,
ec: ExecutionContext,
tid: TransactionId): (Int, Option[JsObject]) = {
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
val connection = new AkkaContainerClient(host, port, timeout, 1)
val response = executeRequest(connection, endPoint, content)
val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures
connection.close()
Expand All @@ -207,7 +213,7 @@ object AkkaContainerClient {
as: ActorSystem,
ec: ExecutionContext,
tid: TransactionId): (Int, Option[JsArray]) = {
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
val connection = new AkkaContainerClient(host, port, timeout, 1)
val response = executeRequestForJsArray(connection, endPoint, content)
val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures
connection.close()
Expand All @@ -220,7 +226,7 @@ object AkkaContainerClient {
tid: TransactionId,
as: ActorSystem,
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
val connection = new AkkaContainerClient(host, port, timeout, 1)
val futureResults = contents.map { executeRequest(connection, endPoint, _) }
val results = Await.result(Future.sequence(futureResults), timeout + 10.seconds) //additional timeout to complete futures
connection.close()
Expand All @@ -234,7 +240,12 @@ object AkkaContainerClient {
tid: TransactionId): Future[(Int, Option[JsObject])] = {

val res = connection
.post(endpoint, content, true)
.post(
endpoint,
content,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
true)
.map({
case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
Expand All @@ -254,7 +265,12 @@ object AkkaContainerClient {
tid: TransactionId): Future[(Int, Option[JsArray])] = {

val res = connection
.post(endpoint, content, true)
.post(
endpoint,
content,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
retry = true)
.map({
case Right(r) => (r.statusCode, Try(r.entity.parseJson.convertTo[JsArray]).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
Expand Down
Loading