-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[New Scheduler] Implement InvokerHealthyManager #5061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.openwhisk.common | ||
|
||
case object GracefulShutdown | ||
case object Enable |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,11 +29,18 @@ import scala.util.Try | |
* @param instance a numeric value used for the load balancing and Kafka topic creation | ||
* @param uniqueName an identifier required for dynamic instance assignment by Zookeeper | ||
* @param displayedName an identifier that is required for the health protocol to correlate Kafka topics with invoker container names | ||
* @param userMemory invoker user memory | ||
* @param busyMemory invoker busy memory | ||
* @param tags actions which included specified annotation tags can be run on this invoker | ||
* @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on this invoker | ||
*/ | ||
case class InvokerInstanceId(val instance: Int, | ||
uniqueName: Option[String] = None, | ||
displayedName: Option[String] = None, | ||
val userMemory: ByteSize) | ||
val userMemory: ByteSize, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this going to affect the message bus? As always I have to check how this will affect rolling restarts of the controllers and invokers, will one component be unhealthy while the other is upgraded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. I tested in my local, doesn't affect the message bus. This pr's invoker in upsteam master's controller's healthy status is
|
||
val busyMemory: Option[ByteSize] = None, | ||
val tags: Seq[String] = Seq.empty[String], | ||
val dedicatedNamespaces: Seq[String] = Seq.empty) | ||
extends InstanceId { | ||
def toInt: Int = instance | ||
|
||
|
@@ -76,7 +83,12 @@ object InvokerInstanceId extends DefaultJsonProtocol { | |
val fields = new ListBuffer[(String, JsValue)] | ||
fields ++= List("instance" -> JsNumber(i.instance)) | ||
fields ++= List("userMemory" -> JsString(i.userMemory.toString)) | ||
i.busyMemory.foreach { busyMemory => | ||
fields ++= List("busyMemory" -> JsString(busyMemory.toString)) | ||
} | ||
fields ++= List("instanceType" -> JsString(i.instanceType)) | ||
fields ++= List("tags" -> JsArray(i.tags.map(_.toJson): _*)) | ||
fields ++= List("dedicatedNamespaces" -> JsArray(i.dedicatedNamespaces.map(_.toJson): _*)) | ||
i.uniqueName.foreach(uniqueName => fields ++= List("uniqueName" -> JsString(uniqueName))) | ||
i.displayedName.foreach(displayedName => fields ++= List("displayedName" -> JsString(displayedName))) | ||
JsObject(fields.toSeq: _*) | ||
|
@@ -87,10 +99,20 @@ object InvokerInstanceId extends DefaultJsonProtocol { | |
val uniqueName = fromField[Option[String]](json, "uniqueName") | ||
val displayedName = fromField[Option[String]](json, "displayedName") | ||
val userMemory = fromField[String](json, "userMemory") | ||
val busyMemory = fromField[Option[String]](json, "busyMemory") | ||
val instanceType = fromField[String](json, "instanceType") | ||
val tags = fromField[Seq[String]](json, "tags") | ||
val dedicatedNamespaces = fromField[Seq[String]](json, "dedicatedNamespaces") | ||
|
||
if (instanceType == "invoker") { | ||
new InvokerInstanceId(instance, uniqueName, displayedName, ByteSize.fromString(userMemory)) | ||
new InvokerInstanceId( | ||
instance, | ||
uniqueName, | ||
displayedName, | ||
ByteSize.fromString(userMemory), | ||
busyMemory.map(ByteSize.fromString(_)), | ||
tags, | ||
dedicatedNamespaces) | ||
} else { | ||
deserializationError("could not read InvokerInstanceId") | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.openwhisk.core.containerpool.v2 | ||
|
||
import akka.actor.ActorRef | ||
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints | ||
|
||
// Event send by the actor | ||
case class ClientCreationCompleted(client: Option[ActorRef] = None) | ||
case object ClientClosed | ||
case object CloseClientProxy | ||
|
||
// Event received by the actor | ||
case object StartClient | ||
case class RequestActivation(lastDuration: Option[Long] = None, newScheduler: Option[SchedulerEndpoints] = None) | ||
|
||
// TODO, use grpc to fetch activation from memoryQueue | ||
class ActivationClientProxy {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.openwhisk.core.containerpool.v2 | ||
|
||
import java.time.Instant | ||
|
||
import akka.actor.ActorRef | ||
import org.apache.openwhisk.common.TransactionId | ||
import org.apache.openwhisk.core.containerpool.Container | ||
import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction} | ||
import org.apache.openwhisk.core.entity.size._ | ||
|
||
// Events received by the actor | ||
case class Initialize(invocationNamespace: String, | ||
action: ExecutableWhiskAction, | ||
schedulerHost: String, | ||
rpcPort: Int, | ||
transId: TransactionId) | ||
case class Start(exec: CodeExec[_], memoryLimit: ByteSize) | ||
|
||
// Event sent by the actor | ||
case class ContainerCreationFailed(throwable: Throwable) | ||
case class ContainerIsPaused(data: WarmData) | ||
case class ClientCreationFailed(throwable: Throwable, | ||
container: Container, | ||
invocationNamespace: String, | ||
action: ExecutableWhiskAction) | ||
case class ReadyToWork(data: Data) | ||
case class Initialized(data: InitializedData) | ||
case class Resumed(data: WarmData) | ||
case class ResumeFailed(data: WarmData) | ||
case class RecreateClient(action: ExecutableWhiskAction) | ||
case object ContainerRemoved // when container is destroyed | ||
|
||
// States | ||
sealed trait ProxyState | ||
case object LeaseStart extends ProxyState | ||
case object Uninitialized extends ProxyState | ||
case object CreatingContainer extends ProxyState | ||
case object ContainerCreated extends ProxyState | ||
case object CreatingClient extends ProxyState | ||
case object ClientCreated extends ProxyState | ||
case object Running extends ProxyState | ||
case object Pausing extends ProxyState | ||
case object Paused extends ProxyState | ||
case object Removing extends ProxyState | ||
case object Rescheduling extends ProxyState | ||
|
||
// Data | ||
sealed abstract class Data(val memoryLimit: ByteSize) { | ||
def getContainer: Option[Container] | ||
} | ||
case class NonexistentData() extends Data(0.B) { | ||
override def getContainer = None | ||
} | ||
case class MemoryData(override val memoryLimit: ByteSize) extends Data(memoryLimit) { | ||
override def getContainer = None | ||
} | ||
trait WithClient { val clientProxy: ActorRef } | ||
case class PreWarmData(container: Container, kind: String, override val memoryLimit: ByteSize) | ||
extends Data(memoryLimit) { | ||
override def getContainer = Some(container) | ||
} | ||
|
||
case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction) | ||
extends Data(action.limits.memory.megabytes.MB) { | ||
override def getContainer = Some(container) | ||
} | ||
|
||
case class InitializedData(container: Container, | ||
invocationNamespace: String, | ||
action: ExecutableWhiskAction, | ||
override val clientProxy: ActorRef) | ||
extends Data(action.limits.memory.megabytes.MB) | ||
with WithClient { | ||
override def getContainer = Some(container) | ||
} | ||
|
||
case class WarmData(container: Container, | ||
invocationNamespace: String, | ||
action: ExecutableWhiskAction, | ||
revision: DocRevision, | ||
lastUsed: Instant, | ||
override val clientProxy: ActorRef) | ||
extends Data(action.limits.memory.megabytes.MB) | ||
with WithClient { | ||
override def getContainer = Some(container) | ||
} | ||
|
||
// TODO | ||
class FunctionPullingContainerProxy {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are tags used for?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used for some special actions. e.g. need high memory, need power gpu.
We can make the action's invocations run on corresponding invokers (BTW, this requirement comes from user in our company)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, dedicatedNamespaces means dedicatedNamespaces's all actions run on corresponding invoker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a VERY powerful feature to group invokers depending on the operator's needs without creating multiple clusters. I'm glad it's being introduced with the new scheduler.