Skip to content

Commit 2d4fd18

Browse files
author
Brendan Doyle
committed
add per min throttling support to fpc
1 parent 3e3414c commit 2d4fd18

File tree

5 files changed

+27
-3
lines changed

5 files changed

+27
-3
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ object WhiskConfig {
203203
object ConfigKeys {
204204
val cluster = "whisk.cluster"
205205
val loadbalancer = "whisk.loadbalancer"
206+
val fpcLoadBalancer = "whisk.loadbalancer.fpc"
206207
val fraction = "whisk.fraction"
207208
val buildInformation = "whisk.info"
208209

core/controller/src/main/resources/reference.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ whisk {
2929
# extra time to increase the timeout for forced active acks
3030
# default is 1.minute
3131
timeout-addon = 1m
32+
33+
fpc {
34+
use-perMin-throttles = false
35+
}
3236
}
3337
controller {
3438
protocol: http

core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ protected[core] abstract class EntitlementProvider(
360360
* @param resources the set of resources must contain at least one resource that can be activated else return None
361361
* @return future completing successfully if user is below limits else failing with a rejection
362362
*/
363-
private def checkUserThrottle(user: Identity, right: Privilege, resources: Set[Resource])(
363+
protected[core] def checkUserThrottle(user: Identity, right: Privilege, resources: Set[Resource])(
364364
implicit transid: TransactionId): Future[Unit] = {
365365
if (right == ACTIVATE) {
366366
if (resources.exists(_.collection.path == Collection.ACTIONS)) {

core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,38 @@ import scala.concurrent.Future
2121
import akka.actor.ActorSystem
2222
import akka.http.scaladsl.model.StatusCodes.TooManyRequests
2323
import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents}
24-
import org.apache.openwhisk.core.WhiskConfig
24+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
2525
import org.apache.openwhisk.core.connector.{EventMessage, Metric}
2626
import org.apache.openwhisk.core.controller.RejectRequest
2727
import org.apache.openwhisk.core.entitlement.Privilege.ACTIVATE
2828
import org.apache.openwhisk.core.entity.{ControllerInstanceId, Identity}
2929
import org.apache.openwhisk.core.loadBalancer.LoadBalancer
30+
import pureconfig.loadConfigOrThrow
31+
import pureconfig.generic.auto._
32+
33+
case class FPCEntitlementConfig(usePerMinThrottles: Boolean)
3034

3135
protected[core] class FPCEntitlementProvider(
3236
private val config: WhiskConfig,
3337
private val loadBalancer: LoadBalancer,
3438
private val controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
3539
extends LocalEntitlementProvider(config, loadBalancer, controllerInstance) {
3640

41+
private implicit val executionContext = actorSystem.dispatcher
42+
43+
private val fpcEntitlementConfig: FPCEntitlementConfig =
44+
loadConfigOrThrow[FPCEntitlementConfig](ConfigKeys.fpcLoadBalancer)
45+
3746
override protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])(
47+
implicit transid: TransactionId): Future[Unit] = {
48+
if (fpcEntitlementConfig.usePerMinThrottles) {
49+
checkUserThrottle(user, right, resources).flatMap(_ => checkFPCConcurrentThrottle(user, right, resources))
50+
} else {
51+
checkFPCConcurrentThrottle(user, right, resources)
52+
}
53+
}
54+
55+
private def checkFPCConcurrentThrottle(user: Identity, right: Privilege, resources: Set[Resource])(
3856
implicit transid: TransactionId): Future[Unit] = {
3957
if (right == ACTIVATE) {
4058
val checks = resources.filter(_.collection.path == Collection.ACTIONS).map { res =>
@@ -55,7 +73,6 @@ protected[core] class FPCEntitlementProvider(
5573
} else Future.successful(())
5674
} else Future.successful(())
5775
}
58-
5976
}
6077

6178
private object FPCEntitlementProvider extends EntitlementSpiProvider {

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, Pro
3434
import scala.language.postfixOps
3535
import scala.util.{Failure, Random, Success, Try}
3636

37+
case class FPCPoolBalancerConfig(usePerMinThrottle: Boolean)
38+
3739
class FPCPoolBalancer(config: WhiskConfig,
3840
controllerInstance: ControllerInstanceId,
3941
etcdClient: EtcdClient,

0 commit comments

Comments
 (0)