Skip to content

Commit 2dde4f3

Browse files
committed
Implement InvokerHealthyManager
1 parent ecb1509 commit 2dde4f3

File tree

11 files changed

+1059
-6
lines changed

11 files changed

+1059
-6
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.common
19+
20+
case object GracefulShutdown
21+
case object Enable

common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ object TransactionId {
236236
val unknown = TransactionId(systemPrefix + "unknown")
237237
val testing = TransactionId(systemPrefix + "testing") // Common id for for unit testing
238238
val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity
239+
val invokerHealthManager = TransactionId(systemPrefix + "invokerHealthManager") // Invoker startup/shutdown or GC activity
239240
val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker warmup thread that makes stem-cell containers
240241
val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker nanny thread
241242
val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message dispatcher
@@ -244,6 +245,7 @@ object TransactionId {
244245
val controller = TransactionId(systemPrefix + "controller") // Controller startup
245246
val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
246247
val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
248+
def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
247249

248250
private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
249251

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,33 @@ object EventMessage extends DefaultJsonProtocol {
428428
def parse(msg: String) = Try(format.read(msg.parseJson))
429429
}
430430

431+
case class InvokerResourceMessage(status: String,
432+
freeMemory: Long,
433+
busyMemory: Long,
434+
inProgressMemory: Long,
435+
tags: Seq[String],
436+
dedicatedNamespaces: Seq[String])
437+
extends Message {
438+
439+
/**
440+
* Serializes message to string. Must be idempotent.
441+
*/
442+
override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
443+
}
444+
445+
object InvokerResourceMessage extends DefaultJsonProtocol {
446+
def parse(msg: String): Try[InvokerResourceMessage] = Try(serdes.read(msg.parseJson))
447+
implicit val serdes =
448+
jsonFormat(
449+
InvokerResourceMessage.apply _,
450+
"status",
451+
"freeMemory",
452+
"busyMemory",
453+
"inProgressMemory",
454+
"tags",
455+
"dedicatedNamespaces")
456+
}
457+
431458
/**
432459
* This case class is used when retrieving the snapshot of the queue status from the scheduler at a certain moment.
433460
* This is useful to figure out the internal status when any issue happens.

common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,18 @@ import scala.util.Try
2929
* @param instance a numeric value used for the load balancing and Kafka topic creation
3030
* @param uniqueName an identifier required for dynamic instance assignment by Zookeeper
3131
* @param displayedName an identifier that is required for the health protocol to correlate Kafka topics with invoker container names
32+
* @param userMemory invoker user memory
33+
* @param busyMemory invoker busy memory
34+
* @param tags actions which included specified annotation tags can be run on this invoker
35+
* @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on this invoker
3236
*/
3337
case class InvokerInstanceId(val instance: Int,
3438
uniqueName: Option[String] = None,
3539
displayedName: Option[String] = None,
36-
val userMemory: ByteSize)
40+
val userMemory: ByteSize,
41+
val busyMemory: Option[ByteSize] = None,
42+
val tags: Seq[String] = Seq.empty[String],
43+
val dedicatedNamespaces: Seq[String] = Seq.empty)
3744
extends InstanceId {
3845
def toInt: Int = instance
3946

@@ -76,7 +83,12 @@ object InvokerInstanceId extends DefaultJsonProtocol {
7683
val fields = new ListBuffer[(String, JsValue)]
7784
fields ++= List("instance" -> JsNumber(i.instance))
7885
fields ++= List("userMemory" -> JsString(i.userMemory.toString))
86+
i.busyMemory.foreach { busyMemory =>
87+
fields ++= List("busyMemory" -> JsString(busyMemory.toString))
88+
}
7989
fields ++= List("instanceType" -> JsString(i.instanceType))
90+
fields ++= List("tags" -> JsArray(i.tags.map(_.toJson): _*))
91+
fields ++= List("dedicatedNamespaces" -> JsArray(i.dedicatedNamespaces.map(_.toJson): _*))
8092
i.uniqueName.foreach(uniqueName => fields ++= List("uniqueName" -> JsString(uniqueName)))
8193
i.displayedName.foreach(displayedName => fields ++= List("displayedName" -> JsString(displayedName)))
8294
JsObject(fields.toSeq: _*)
@@ -87,10 +99,20 @@ object InvokerInstanceId extends DefaultJsonProtocol {
8799
val uniqueName = fromField[Option[String]](json, "uniqueName")
88100
val displayedName = fromField[Option[String]](json, "displayedName")
89101
val userMemory = fromField[String](json, "userMemory")
102+
val busyMemory = fromField[Option[String]](json, "busyMemory")
90103
val instanceType = fromField[String](json, "instanceType")
104+
val tags = fromField[Seq[String]](json, "tags")
105+
val dedicatedNamespaces = fromField[Seq[String]](json, "dedicatedNamespaces")
91106

92107
if (instanceType == "invoker") {
93-
new InvokerInstanceId(instance, uniqueName, displayedName, ByteSize.fromString(userMemory))
108+
new InvokerInstanceId(
109+
instance,
110+
uniqueName,
111+
displayedName,
112+
ByteSize.fromString(userMemory),
113+
busyMemory.map(ByteSize.fromString(_)),
114+
tags,
115+
dedicatedNamespaces)
94116
} else {
95117
deserializationError("could not read InvokerInstanceId")
96118
}

core/invoker/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses
3737
dependencies {
3838
compile "org.scala-lang:scala-library:${gradle.scala.version}"
3939
compile project(':common:scala')
40+
compile project(':core:scheduler')
4041

4142
compile ("org.apache.curator:curator-recipes:${gradle.curator.version}") {
4243
exclude group: 'org.apache.zookeeper', module:'zookeeper'
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.containerpool.v2
19+
20+
import akka.actor.ActorRef
21+
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
22+
23+
// Event send by the actor
24+
case class ClientCreationCompleted(client: Option[ActorRef] = None)
25+
case object ClientClosed
26+
case object CloseClientProxy
27+
28+
// Event received by the actor
29+
case object StartClient
30+
case class RequestActivation(lastDuration: Option[Long] = None, newScheduler: Option[SchedulerEndpoints] = None)
31+
32+
// TODO, use grpc to fetch activation from memoryQueue
33+
class ActivationClientProxy {}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.containerpool.v2
19+
20+
import java.time.Instant
21+
22+
import akka.actor.ActorRef
23+
import org.apache.openwhisk.common.TransactionId
24+
import org.apache.openwhisk.core.containerpool.Container
25+
import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
26+
import org.apache.openwhisk.core.entity.size._
27+
28+
// Events received by the actor
29+
case class Initialize(invocationNamespace: String,
30+
action: ExecutableWhiskAction,
31+
schedulerHost: String,
32+
rpcPort: Int,
33+
transId: TransactionId)
34+
case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
35+
36+
// Event sent by the actor
37+
case class ContainerCreationFailed(throwable: Throwable)
38+
case class ContainerIsPaused(data: WarmData)
39+
case class ClientCreationFailed(throwable: Throwable,
40+
container: Container,
41+
invocationNamespace: String,
42+
action: ExecutableWhiskAction)
43+
case class ReadyToWork(data: Data)
44+
case class Initialized(data: InitializedData)
45+
case class Resumed(data: WarmData)
46+
case class ResumeFailed(data: WarmData)
47+
case class RecreateClient(action: ExecutableWhiskAction)
48+
case object ContainerRemoved // when container is destroyed
49+
50+
// States
51+
sealed trait ProxyState
52+
case object LeaseStart extends ProxyState
53+
case object Uninitialized extends ProxyState
54+
case object CreatingContainer extends ProxyState
55+
case object ContainerCreated extends ProxyState
56+
case object CreatingClient extends ProxyState
57+
case object ClientCreated extends ProxyState
58+
case object Running extends ProxyState
59+
case object Pausing extends ProxyState
60+
case object Paused extends ProxyState
61+
case object Removing extends ProxyState
62+
case object Rescheduling extends ProxyState
63+
64+
// Data
65+
sealed abstract class Data(val memoryLimit: ByteSize) {
66+
def getContainer: Option[Container]
67+
}
68+
case class NonexistentData() extends Data(0.B) {
69+
override def getContainer = None
70+
}
71+
case class MemoryData(override val memoryLimit: ByteSize) extends Data(memoryLimit) {
72+
override def getContainer = None
73+
}
74+
trait WithClient { val clientProxy: ActorRef }
75+
case class PreWarmData(container: Container, kind: String, override val memoryLimit: ByteSize)
76+
extends Data(memoryLimit) {
77+
override def getContainer = Some(container)
78+
}
79+
80+
case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
81+
extends Data(action.limits.memory.megabytes.MB) {
82+
override def getContainer = Some(container)
83+
}
84+
85+
case class InitializedData(container: Container,
86+
invocationNamespace: String,
87+
action: ExecutableWhiskAction,
88+
override val clientProxy: ActorRef)
89+
extends Data(action.limits.memory.megabytes.MB)
90+
with WithClient {
91+
override def getContainer = Some(container)
92+
}
93+
94+
case class WarmData(container: Container,
95+
invocationNamespace: String,
96+
action: ExecutableWhiskAction,
97+
revision: DocRevision,
98+
lastUsed: Instant,
99+
override val clientProxy: ActorRef)
100+
extends Data(action.limits.memory.megabytes.MB)
101+
with WithClient {
102+
override def getContainer = Some(container)
103+
}
104+
105+
// TODO
106+
class FunctionPullingContainerProxy {}

0 commit comments

Comments
 (0)