Skip to content

Commit efdbd60

Browse files
[New Scheduler]Implement FPCEntitlementProvider (#5029)
* Implement FPCEntitlementProvider * Add throttling metric
1 parent 212d809 commit efdbd60

File tree

3 files changed

+85
-6
lines changed

3 files changed

+85
-6
lines changed

core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ protected[core] abstract class EntitlementProvider(
153153
activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations))
154154

155155
private val messagingProvider = SpiLoader.get[MessagingProvider]
156-
private val eventProducer = messagingProvider.getProducer(this.config)
156+
protected val eventProducer = messagingProvider.getProducer(this.config)
157157

158158
/**
159159
* Grants a subject the right to access a resources.
@@ -201,6 +201,19 @@ protected[core] abstract class EntitlementProvider(
201201
.flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user), user))
202202
}
203203

204+
/**
205+
* Checks action activation rate throttles for an identity.
206+
*
207+
* @param user the identity to check rate throttles for
208+
* @param right the privilege the subject is requesting
209+
* @param resources the set of resource the subject requests access to
210+
* @return a promise that completes with success iff the user is within their activation quota
211+
*/
212+
protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])(
213+
implicit transid: TransactionId): Future[Unit] = {
214+
checkUserThrottle(user, right, resources).flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
215+
}
216+
204217
private val kindRestrictor = {
205218
import pureconfig._
206219
import pureconfig.generic.auto._
@@ -284,11 +297,7 @@ protected[core] abstract class EntitlementProvider(
284297
val entitlementCheck: Future[Unit] = if (user.rights.contains(right)) {
285298
if (resources.nonEmpty) {
286299
logging.debug(this, s"checking user '$subject' has privilege '$right' for '${resources.mkString(", ")}'")
287-
val throttleCheck =
288-
if (noThrottle) Future.successful(())
289-
else
290-
checkUserThrottle(user, right, resources)
291-
.flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
300+
val throttleCheck = if (noThrottle) Future.successful(()) else checkThrottles(user, right, resources)
292301
throttleCheck
293302
.flatMap(_ => checkPrivilege(user, right, resources))
294303
.flatMap(checkedResources => {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.entitlement
19+
20+
import scala.concurrent.Future
21+
import akka.actor.ActorSystem
22+
import akka.http.scaladsl.model.StatusCodes.TooManyRequests
23+
import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents}
24+
import org.apache.openwhisk.core.WhiskConfig
25+
import org.apache.openwhisk.core.connector.{EventMessage, Metric}
26+
import org.apache.openwhisk.core.controller.RejectRequest
27+
import org.apache.openwhisk.core.entitlement.Privilege.ACTIVATE
28+
import org.apache.openwhisk.core.entity.{ControllerInstanceId, Identity}
29+
import org.apache.openwhisk.core.loadBalancer.LoadBalancer
30+
31+
protected[core] class FPCEntitlementProvider(
32+
private val config: WhiskConfig,
33+
private val loadBalancer: LoadBalancer,
34+
private val controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
35+
extends LocalEntitlementProvider(config, loadBalancer, controllerInstance) {
36+
37+
override protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])(
38+
implicit transid: TransactionId): Future[Unit] = {
39+
if (right == ACTIVATE) {
40+
val checks = resources.filter(_.collection.path == Collection.ACTIONS).map { res =>
41+
loadBalancer.checkThrottle(user.namespace.name.toPath, res.fqname)
42+
}
43+
if (checks.contains(true)) {
44+
val metric = Metric("ConcurrentRateLimit", 1)
45+
UserEvents.send(
46+
eventProducer,
47+
EventMessage(
48+
s"controller${controllerInstance.asString}",
49+
metric,
50+
user.subject,
51+
user.namespace.name.toString,
52+
user.namespace.uuid,
53+
metric.typeName))
54+
Future.failed(RejectRequest(TooManyRequests, "Too many requests"))
55+
} else Future.successful(())
56+
} else Future.successful(())
57+
}
58+
59+
}
60+
61+
private object FPCEntitlementProvider extends EntitlementSpiProvider {
62+
63+
override def instance(config: WhiskConfig, loadBalancer: LoadBalancer, instance: ControllerInstanceId)(
64+
implicit actorSystem: ActorSystem,
65+
logging: Logging) =
66+
new FPCEntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer, instance)
67+
}

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ trait LoadBalancer {
7575

7676
/** Gets the size of the cluster all loadbalancers are acting in */
7777
def clusterSize: Int = 1
78+
79+
/** Gets the throttling for given action. */
80+
def checkThrottle(namespace: EntityPath, action: String): Boolean = false
7881
}
7982

8083
/**

0 commit comments

Comments
 (0)