Skip to content

Commit b1ccbec

Browse files
style95bdoyle0182
andauthored
Add FPC Load Balancer (#5192)
* Add FPC pool balancer * Deploy etcd for tests * Apply scalaFmt * Remove a redundant line. * Update core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala Update comments Co-authored-by: Brendan Doyle <[email protected]> * Update core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala Co-authored-by: Brendan Doyle <[email protected]> * Update core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala Co-authored-by: Brendan Doyle <[email protected]> * Update core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala Co-authored-by: Brendan Doyle <[email protected]> * Update core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala Co-authored-by: Brendan Doyle <[email protected]> * Apply comments. Co-authored-by: Brendan Doyle <[email protected]>
1 parent 9633043 commit b1ccbec

File tree

10 files changed

+1045
-6
lines changed

10 files changed

+1045
-6
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class AverageRingBuffer(private val maxSize: Int) {
3737

3838
def nonEmpty: Boolean = elements.nonEmpty
3939

40-
def average: Double = {
40+
def average: Double = {
4141
val size = elements.size
4242
if (size > 2) {
4343
(sum - max - min) / (size - 2)

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
6161
protected[loadBalancer] val activationPromises =
6262
TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
6363
protected val activationsPerNamespace = TrieMap[UUID, LongAdder]()
64+
protected val activationsPerController = TrieMap[ControllerInstanceId, LongAdder]()
65+
protected val activationsPerInvoker = TrieMap[InvokerInstanceId, LongAdder]()
6466
protected val totalActivations = new LongAdder()
6567
protected val totalBlackBoxActivationMemory = new LongAdder()
6668
protected val totalManagedActivationMemory = new LongAdder()
@@ -83,6 +85,14 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
8385
override def activeActivationsFor(namespace: UUID): Future[Int] =
8486
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue).getOrElse(0))
8587
override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue)
88+
override def activeActivationsByController(controller: String): Future[Int] =
89+
Future.successful(activationsPerController.get(ControllerInstanceId(controller)).map(_.intValue()).getOrElse(0))
90+
override def activeActivationsByController: Future[List[ActivationId]] =
91+
Future.successful(activationSlots.keySet.toList)
92+
override def activeActivationsByInvoker(invoker: String): Future[Int] =
93+
Future.successful(
94+
activationsPerInvoker.get(InvokerInstanceId(invoker.toInt, userMemory = 0.MB)).map(_.intValue()).getOrElse(0))
95+
override def close: Unit = activationFeed ! GracefulShutdown
8696

8797
/**
8898
* Calculate the duration within which a completion ack must be received for an activation.
@@ -125,6 +135,10 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
125135
totalActivationMemory.add(action.limits.memory.megabytes)
126136

127137
activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new LongAdder()).increment()
138+
activationsPerController.getOrElseUpdate(controllerInstance, new LongAdder()).increment()
139+
activationsPerInvoker
140+
.getOrElseUpdate(InvokerInstanceId(instance.instance, userMemory = 0.MB), new LongAdder())
141+
.increment()
128142

129143
// Completion Ack must be received within the calculated time.
130144
val completionAckTimeout = calculateCompletionAckTimeout(action.limits.timeout.duration)
@@ -162,7 +176,8 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
162176
action.fullyQualifiedName(true),
163177
timeoutHandler,
164178
isBlackboxInvocation,
165-
msg.blocking)
179+
msg.blocking,
180+
controllerInstance)
166181
})
167182

168183
resultPromise
@@ -287,6 +302,10 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
287302
if (entry.isBlackbox) totalBlackBoxActivationMemory else totalManagedActivationMemory
288303
totalActivationMemory.add(entry.memoryLimit.toMB * (-1))
289304
activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
305+
activationsPerController.get(entry.controllerId).foreach(_.decrement())
306+
activationsPerInvoker
307+
.get(InvokerInstanceId(entry.invokerName.instance, userMemory = 0.MB))
308+
.foreach(_.decrement())
290309

291310
invoker.foreach(releaseInvoker(_, entry))
292311

0 commit comments

Comments
 (0)