Skip to content

[New Scheduler] Implement InvokerHealthyManager #5061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.common

case object GracefulShutdown
case object Enable
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ object TransactionId {
val unknown = TransactionId(systemPrefix + "unknown")
val testing = TransactionId(systemPrefix + "testing") // Common id for for unit testing
val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity
val invokerHealthManager = TransactionId(systemPrefix + "invokerHealthManager") // Invoker startup/shutdown or GC activity
val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker warmup thread that makes stem-cell containers
val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker nanny thread
val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message dispatcher
Expand All @@ -244,6 +245,7 @@ object TransactionId {
val controller = TransactionId(systemPrefix + "controller") // Controller startup
val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation

private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,33 @@ object EventMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(format.read(msg.parseJson))
}

case class InvokerResourceMessage(status: String,
freeMemory: Long,
busyMemory: Long,
inProgressMemory: Long,
tags: Seq[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are tags used for?

Copy link
Contributor Author

@ningyougang ningyougang Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used for some special actions. e.g. need high memory, need power gpu.
We can make the action's invocations run on corresponding invokers (BTW, this requirement comes from user in our company)

  • Firstly, add some annotation to the actions, e.g.
wsk action create hello-gpu ~/hello-gpu.js --annotation  invoker-resources ["gpu"]
  • Secondly, when deploy invoker, add the relative tag to the invoker as well
whisk/invokers/0/0
{"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":['gpu']}
whisk/invokers/1/1
{"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":[]}
  • Finally, when run the action, the activations for that action will run on above invoker0 which includes tag: gpu

Copy link
Contributor Author

@ningyougang ningyougang Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, dedicatedNamespaces means dedicatedNamespaces's all actions run on corresponding invoker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a VERY powerful feature to group invokers depending on the operator's needs without creating multiple clusters. I'm glad it's being introduced with the new scheduler.

dedicatedNamespaces: Seq[String])
extends Message {

/**
* Serializes message to string. Must be idempotent.
*/
override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
}

object InvokerResourceMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[InvokerResourceMessage] = Try(serdes.read(msg.parseJson))
implicit val serdes =
jsonFormat(
InvokerResourceMessage.apply _,
"status",
"freeMemory",
"busyMemory",
"inProgressMemory",
"tags",
"dedicatedNamespaces")
}

/**
* This case class is used when retrieving the snapshot of the queue status from the scheduler at a certain moment.
* This is useful to figure out the internal status when any issue happens.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,18 @@ import scala.util.Try
* @param instance a numeric value used for the load balancing and Kafka topic creation
* @param uniqueName an identifier required for dynamic instance assignment by Zookeeper
* @param displayedName an identifier that is required for the health protocol to correlate Kafka topics with invoker container names
* @param userMemory invoker user memory
* @param busyMemory invoker busy memory
* @param tags actions which included specified annotation tags can be run on this invoker
* @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on this invoker
*/
case class InvokerInstanceId(val instance: Int,
uniqueName: Option[String] = None,
displayedName: Option[String] = None,
val userMemory: ByteSize)
val userMemory: ByteSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to affect the message bus? As always I have to check how this will affect rolling restarts of the controllers and invokers, will one component be unhealthy while the other is upgraded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question.

I tested in my local, doesn't affect the message bus. This pr's invoker in upsteam master's controller's healthy status is up.
But your said problem i meet before, in that time, it seems the PingMessage is changed, so lead to the invoker of new codes in controller of old code's healthy status is unhealthy. if this issue comes. we solved it using another deployment method, e.g.

  • Remove half controller from nginx.
  • Disable half invoker
  • Deploy half controller/invoker using new codes
  • Add the half controller nginx.
  • Deploy another half components using above steps

val busyMemory: Option[ByteSize] = None,
val tags: Seq[String] = Seq.empty[String],
val dedicatedNamespaces: Seq[String] = Seq.empty)
extends InstanceId {
def toInt: Int = instance

Expand Down Expand Up @@ -76,7 +83,12 @@ object InvokerInstanceId extends DefaultJsonProtocol {
val fields = new ListBuffer[(String, JsValue)]
fields ++= List("instance" -> JsNumber(i.instance))
fields ++= List("userMemory" -> JsString(i.userMemory.toString))
i.busyMemory.foreach { busyMemory =>
fields ++= List("busyMemory" -> JsString(busyMemory.toString))
}
fields ++= List("instanceType" -> JsString(i.instanceType))
fields ++= List("tags" -> JsArray(i.tags.map(_.toJson): _*))
fields ++= List("dedicatedNamespaces" -> JsArray(i.dedicatedNamespaces.map(_.toJson): _*))
i.uniqueName.foreach(uniqueName => fields ++= List("uniqueName" -> JsString(uniqueName)))
i.displayedName.foreach(displayedName => fields ++= List("displayedName" -> JsString(displayedName)))
JsObject(fields.toSeq: _*)
Expand All @@ -87,10 +99,20 @@ object InvokerInstanceId extends DefaultJsonProtocol {
val uniqueName = fromField[Option[String]](json, "uniqueName")
val displayedName = fromField[Option[String]](json, "displayedName")
val userMemory = fromField[String](json, "userMemory")
val busyMemory = fromField[Option[String]](json, "busyMemory")
val instanceType = fromField[String](json, "instanceType")
val tags = fromField[Seq[String]](json, "tags")
val dedicatedNamespaces = fromField[Seq[String]](json, "dedicatedNamespaces")

if (instanceType == "invoker") {
new InvokerInstanceId(instance, uniqueName, displayedName, ByteSize.fromString(userMemory))
new InvokerInstanceId(
instance,
uniqueName,
displayedName,
ByteSize.fromString(userMemory),
busyMemory.map(ByteSize.fromString(_)),
tags,
dedicatedNamespaces)
} else {
deserializationError("could not read InvokerInstanceId")
}
Expand Down
1 change: 1 addition & 0 deletions core/invoker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses
dependencies {
compile "org.scala-lang:scala-library:${gradle.scala.version}"
compile project(':common:scala')
compile project(':core:scheduler')

compile ("org.apache.curator:curator-recipes:${gradle.curator.version}") {
exclude group: 'org.apache.zookeeper', module:'zookeeper'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.containerpool.v2

import akka.actor.ActorRef
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints

// Event send by the actor
case class ClientCreationCompleted(client: Option[ActorRef] = None)
case object ClientClosed
case object CloseClientProxy

// Event received by the actor
case object StartClient
case class RequestActivation(lastDuration: Option[Long] = None, newScheduler: Option[SchedulerEndpoints] = None)

// TODO, use grpc to fetch activation from memoryQueue
class ActivationClientProxy {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.containerpool.v2

import java.time.Instant

import akka.actor.ActorRef
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool.Container
import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
import org.apache.openwhisk.core.entity.size._

// Events received by the actor
case class Initialize(invocationNamespace: String,
action: ExecutableWhiskAction,
schedulerHost: String,
rpcPort: Int,
transId: TransactionId)
case class Start(exec: CodeExec[_], memoryLimit: ByteSize)

// Event sent by the actor
case class ContainerCreationFailed(throwable: Throwable)
case class ContainerIsPaused(data: WarmData)
case class ClientCreationFailed(throwable: Throwable,
container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction)
case class ReadyToWork(data: Data)
case class Initialized(data: InitializedData)
case class Resumed(data: WarmData)
case class ResumeFailed(data: WarmData)
case class RecreateClient(action: ExecutableWhiskAction)
case object ContainerRemoved // when container is destroyed

// States
sealed trait ProxyState
case object LeaseStart extends ProxyState
case object Uninitialized extends ProxyState
case object CreatingContainer extends ProxyState
case object ContainerCreated extends ProxyState
case object CreatingClient extends ProxyState
case object ClientCreated extends ProxyState
case object Running extends ProxyState
case object Pausing extends ProxyState
case object Paused extends ProxyState
case object Removing extends ProxyState
case object Rescheduling extends ProxyState

// Data
sealed abstract class Data(val memoryLimit: ByteSize) {
def getContainer: Option[Container]
}
case class NonexistentData() extends Data(0.B) {
override def getContainer = None
}
case class MemoryData(override val memoryLimit: ByteSize) extends Data(memoryLimit) {
override def getContainer = None
}
trait WithClient { val clientProxy: ActorRef }
case class PreWarmData(container: Container, kind: String, override val memoryLimit: ByteSize)
extends Data(memoryLimit) {
override def getContainer = Some(container)
}

case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
extends Data(action.limits.memory.megabytes.MB) {
override def getContainer = Some(container)
}

case class InitializedData(container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction,
override val clientProxy: ActorRef)
extends Data(action.limits.memory.megabytes.MB)
with WithClient {
override def getContainer = Some(container)
}

case class WarmData(container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction,
revision: DocRevision,
lastUsed: Instant,
override val clientProxy: ActorRef)
extends Data(action.limits.memory.megabytes.MB)
with WithClient {
override def getContainer = Some(container)
}

// TODO
class FunctionPullingContainerProxy {}
Loading