-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add FPC Load Balancer #5192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add FPC Load Balancer #5192
Conversation
Codecov Report
@@ Coverage Diff @@
## master #5192 +/- ##
===========================================
+ Coverage 45.58% 74.09% +28.50%
===========================================
Files 234 235 +1
Lines 13389 13648 +259
Branches 551 546 -5
===========================================
+ Hits 6103 10112 +4009
+ Misses 7286 3536 -3750
Continue to review full report at Codecov.
|
19a3171
to
68352bb
Compare
It's ready to review. |
} | ||
.getOrElse { | ||
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map { scheduler => | ||
createQueue(invocationNamespace, action.toWhiskAction, msg.action, msg.revision, scheduler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no queue, it tries to create a queue asynchronously.
topicBaseName.flatMap { baseName => | ||
val topicName = Controller.topicPrefix + baseName | ||
val activationResult = setupActivation(msg, action) | ||
sendActivationToKafka(messageProducer, msg, topicName).map(_ => activationResult) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Along with the queue creation, it sends activation to Kafka at the same time.
} | ||
|
||
override def receive: Receive = { | ||
case WatchEndpointRemoved(watchKey, key, value, true) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It watches ETCD so that gets events whenever queues/schedulers are added/removed and updates its endpoint maps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to get out of sync here? What if a queue is removed by the scheduler or the scheduler is brought down and this message isn't delivered to the load balancer? Can that happen? And if it can would that cause problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These endpoints are kept alive by the lease.
When any components go wrong and cannot send a keepalive request for a given time(10s by default), data are automatically gone and controllers receive this event.
override def checkThrottle(namespace: EntityPath, fqn: String): Boolean = { | ||
|
||
/** | ||
* Note! The throttle key is assumed to exist unconditionally and is not limited to throttle if not present. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throttling works as follows.
Now the unit of throttling is the number of containers.
And there are two kinds of throttling, 1. namespace throttling
and 2. action throttling
with one configuration, concurrent limit
.
Both throttlings are enabled under the following scenario.
Let's say the concurrent limit of a namespace is 30.
- When there are 30 containers running in the namespace, the
namespace throttling
is enabled. - When the
namespace throttling
is enabled, no more new type of action is accepted.- For example, if
actionA
is invoked and there are 30 containers foractionA
only activations foractionA
are allowed. - When another
actionB
is invoked, it is rejected with 429 too many requests. - This is because it's not possible to create more containers in the namespace
- When there are two actions
actionA
andactionB
are running, activations for only those two actions are accepted in the same way.
- For example, if
- If the workload keeps increasing, activations for
actionA
will be piled up in a queue because there are already 30 containers running and no more containers can be created. - If too many activations are buffered in a queue,
action throttling
is enabled. This means that the current containers cannot handle all activations in time. This threshold is configurable.- There are two action throttling configurations, max retention size, and throttling fraction.
- If there are more activations in a queue than max retention size, action throttling is enabled.
- If there are fewer activations than
max-retention-size * throttling fraction
), action throttling is disabled.
(This is to prevent too frequent throttling changes.)
- So when a new activation comes, the load balancer checks whether the activation is throttled or not in this way.
- Check if
action throttling
is enabled, if yes reject the requests. - If
action throttling
is not enabled, accept the requests. - If
action throttling information
does not exist, checksnamespace throttling
. It means no queue is created for the action. - If
namespace throttling
is enabled, reject the activation. This is because there is no container running and the namespace is already throttled. - If
namespace throttling
is disabled or no information exists, it accepts the activation.
- Check if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For action throttling, is there no longer any concept of invocations per minute throttling? per minute throttling was useful for user applications to throttle to protect their downstream services that their action calls. Now it seems like there's no way to throttle based on what the action application can handle, but is strictly only for system throttling to protect openwhisk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. there is no such thing.
If that is necessary, we can add a new feature to the scheduler part. I believe we can control the rate activations are pulled out of a queue.
And if it is critical and urgent, we can indirectly limit invocations with the number of containers with the assumption that one action's execution time does not drastically vary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm really doing a deep dive into the code and I'm a little confused on namespace throttling and how you prevent starvation of other actions within a namespace. As you explain, if actionA
has all of the containers for a namespace and is at the limit but the namespace also wants to run some of actionB
they just get throttled? If actionA
sends consistent load to keep up with the container limit for the namespace but not backlog the queue to turn on action throttling, how would actionB
ever get to run any instances? On the old scheduling algorithm it's just random based on how many activations in flight so even if you're hitting the concurrent throttle, you don't have to worry about one action completely starving another action in the namespace. Am I understanding this correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in such a case, it is natural to increase the namespace limit.
If someone regularly invokes an action utilizing all the resources and tries to run another one, and if both of the actions are supposed to run well he needs a higher resource limit. In the above case, there are not enough resources and I feel it's not that weird to get a 429 Too many requests
response.
When at least one container is created for actionB
before the namespace limit is reached, actionB
can still run as expected even after the namespace is throttled. However, there are not enough containers and activations will get piled up in queues, and eventually, the system will reject the requests with 429 responses if requests keep incoming at the same rate.
To be able to run both of them well, it is necessary to increase the limit to accommodate all of them.
Even if we increase the limit, it would be fine because the system doesn't create more containers than required.
.toString) | ||
} | ||
.getOrElse { | ||
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map { scheduler => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code in this getOrElse
can be moved to it's own function since it's reused below in a different getOrElse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into the code again.
Actually, there are two different logic, one is to select the right scheduler, and the other is to create a queue to the given scheduler. Since the createQueue
method is using recursion, it would be better to separate it as a standalone method.
So if we create a new method for this kind of duplication, that would be just a wrapper method with this code as-is. The wrapper method would have just two method calls, schedule
and createQueue
without any other logic and its parameters would be just the sum of the two underlying methods' parameters.
I am inclined to keep the original code, HDYT?
* @param excludeSchedulers schedulers which should not be chose | ||
* @return Assigned a scheduler | ||
*/ | ||
def schedule(schedulers: IndexedSeq[SchedulerStates], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I agree with overloading the function name schedule
. The two methods in this class for it seemingly do two different things, one picks an invoker to send an activation to and the other picks a scheduler to create a queue for the action tied to that activation. So I think these functions should be renamed to be more specific
core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
Outdated
Show resolved
Hide resolved
* @return Assigned an invoker | ||
*/ | ||
// TODO add input/output example | ||
def schedule(invokers: IndexedSeq[InvokerHealth], minMemory: ByteSize): Option[InvokerInstanceId] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by picking an invoker here. Isn't it the responsibility of the scheduler to pick an invoker now, not the controller? Or is this method being called from the scheduler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh.. this should be deleted.
This is only required for our downstream to send a message to debugging invokers.
core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
Outdated
Show resolved
Hide resolved
core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
Outdated
Show resolved
Hide resolved
|
||
case _ => | ||
activationFeed ! MessageFeed.Processed | ||
logging.error(this, s"Unexpected Acknowledgment message received by loadbalancer: $raw") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a warn instead of error?
core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
override def receive: Receive = { | ||
case WatchEndpointRemoved(watchKey, key, value, true) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to get out of sync here? What if a queue is removed by the scheduler or the scheduler is brought down and this message isn't delivered to the load balancer? Can that happen? And if it can would that cause problems?
implicit val transid = entry.transactionId | ||
logging.warn( | ||
this, | ||
s"The $key is deleted from ETCD, but there are still unhandled activations for this action, try to create a new queue") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to remember the architecture here, but how do unhandled activations still get retried after creating the new queue. Is it because this load balancer puts them on kafka and they'd just be sitting there waiting to be consumed by the scheduler with the recreated queue still? If the etcd queue gets deleted with an activation in it, then that's just lost at that point right since it's already been read off kafka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When there is an activation in a queue, the queue is not deleted.
Unhandled activations here come from the possibility that while the queue is deleted because of timeout(no more activation), still there can be new activations incoming on the controller side.
Regarding the role of a controller, yes, now it is rather passive.
It just selects the right scheduler and send a queue creation request and put activations to one of the scheduler's Kafka topic.
Still, activations can be delivered to a scheduler without a queue for the given activations, then the scheduler looks for(with retries) the target scheduler that has a queue and forwards activations to the scheduler.
(I hope we can remove Kafka from the critical path entirely in the future. Then we are probably able to consolidate these two calls into one.)
So basically the situation here is supposed to rarely happen.
Once a queue is created, the activation will be forwarded to the proper scheduler.
…alancer/FPCPoolBalancer.scala Update comments Co-authored-by: Brendan Doyle <[email protected]>
…alancer/FPCPoolBalancer.scala Co-authored-by: Brendan Doyle <[email protected]>
…alancer/FPCPoolBalancer.scala Co-authored-by: Brendan Doyle <[email protected]>
…alancer/FPCPoolBalancer.scala Co-authored-by: Brendan Doyle <[email protected]>
…alancer/FPCPoolBalancer.scala Co-authored-by: Brendan Doyle <[email protected]>
Description
This is to add FPCPoolBalancer
I opened a new PR to see if there is any difference in the Travis CI environment.
Old one: #5158
Related issue and scope
My changes affect the following components
Types of changes
Checklist: