diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index cd33ffe17d3..cae43b65fc2 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -203,6 +203,7 @@ object WhiskConfig { object ConfigKeys { val cluster = "whisk.cluster" val loadbalancer = "whisk.loadbalancer" + val fpcLoadBalancer = "whisk.loadbalancer.fpc" val fraction = "whisk.fraction" val buildInformation = "whisk.info" diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf index 70d2fae517f..dfde945a003 100644 --- a/core/controller/src/main/resources/reference.conf +++ b/core/controller/src/main/resources/reference.conf @@ -29,6 +29,10 @@ whisk { # extra time to increase the timeout for forced active acks # default is 1.minute timeout-addon = 1m + + fpc { + use-perMin-throttles = false + } } controller { protocol: http diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala index 8092ad8f0bd..88086d20fbb 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala @@ -360,7 +360,7 @@ protected[core] abstract class EntitlementProvider( * @param resources the set of resources must contain at least one resource that can be activated else return None * @return future completing successfully if user is below limits else failing with a rejection */ - private def checkUserThrottle(user: Identity, right: Privilege, resources: Set[Resource])( + protected[core] def checkUserThrottle(user: Identity, right: Privilege, resources: Set[Resource])( implicit transid: TransactionId): Future[Unit] = { if (right == ACTIVATE) { if (resources.exists(_.collection.path == Collection.ACTIONS)) { diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala index 9e89db23dc9..3c5a3256968 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala @@ -21,12 +21,16 @@ import scala.concurrent.Future import akka.actor.ActorSystem import akka.http.scaladsl.model.StatusCodes.TooManyRequests import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents} -import org.apache.openwhisk.core.WhiskConfig +import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.core.connector.{EventMessage, Metric} import org.apache.openwhisk.core.controller.RejectRequest import org.apache.openwhisk.core.entitlement.Privilege.ACTIVATE import org.apache.openwhisk.core.entity.{ControllerInstanceId, Identity} import org.apache.openwhisk.core.loadBalancer.LoadBalancer +import pureconfig.loadConfigOrThrow +import pureconfig.generic.auto._ + +case class FPCEntitlementConfig(usePerMinThrottles: Boolean) protected[core] class FPCEntitlementProvider( private val config: WhiskConfig, @@ -34,7 +38,21 @@ protected[core] class FPCEntitlementProvider( private val controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging) extends LocalEntitlementProvider(config, loadBalancer, controllerInstance) { + private implicit val executionContext = actorSystem.dispatcher + + private val fpcEntitlementConfig: FPCEntitlementConfig = + loadConfigOrThrow[FPCEntitlementConfig](ConfigKeys.fpcLoadBalancer) + override protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])( + implicit transid: TransactionId): Future[Unit] = { + if (fpcEntitlementConfig.usePerMinThrottles) { + checkUserThrottle(user, right, resources).flatMap(_ => checkFPCConcurrentThrottle(user, right, resources)) + } else { + checkFPCConcurrentThrottle(user, right, resources) + } + } + + private def checkFPCConcurrentThrottle(user: Identity, right: Privilege, resources: Set[Resource])( implicit transid: TransactionId): Future[Unit] = { if (right == ACTIVATE) { val checks = resources.filter(_.collection.path == Collection.ACTIONS).map { res => @@ -55,7 +73,6 @@ protected[core] class FPCEntitlementProvider( } else Future.successful(()) } else Future.successful(()) } - } private object FPCEntitlementProvider extends EntitlementSpiProvider { diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala index f606f64f8a0..8bb2d22c395 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala @@ -34,6 +34,8 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, Pro import scala.language.postfixOps import scala.util.{Failure, Random, Success, Try} +case class FPCPoolBalancerConfig(usePerMinThrottle: Boolean) + class FPCPoolBalancer(config: WhiskConfig, controllerInstance: ControllerInstanceId, etcdClient: EtcdClient,