Skip to content

Commit f1829e1

Browse files
authored
[New Scheduler] Manage container creation (#5074)
* Manage container creation * Add ContainerManager Test * Add dedicated namespace * Remove namespace * Apply scala fmt * Add dedicatedNamespaces filter * Add dedicatedNamespaces test * Move InvokerState to common * Unify InvokerHealth message * Add configuration for test * Add license header * Remove compare InvokerResourceMessage
1 parent 9c445f3 commit f1829e1

File tree

20 files changed

+1881
-76
lines changed

20 files changed

+1881
-76
lines changed

ansible/group_vars/all

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,5 +456,6 @@ etcd_connect_string: "{% set ret = [] %}\
456456
{{ ret | join(',') }}"
457457

458458
scheduler:
459+
protocol: "{{ scheduler_protocol | default('http') }}"
459460
dataManagementService:
460461
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,9 +560,11 @@ object LoggingMarkers {
560560
LogMarkerToken(kafka, "topic", start, Some("delay"), Map("topic" -> topic))(MeasurementUnit.time.milliseconds)
561561
else LogMarkerToken(kafka, topic, start, Some("delay"))(MeasurementUnit.time.milliseconds)
562562

563+
// Time that is needed to produce message in kafka
564+
val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
565+
563566
def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
564567
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
565-
566568
/*
567569
* General markers
568570
*/

common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,45 @@
1717

1818
package org.apache.openwhisk.common
1919

20+
import org.apache.openwhisk.core.entity.InvokerInstanceId
21+
2022
case object GracefulShutdown
2123
case object Enable
24+
25+
// States an Invoker can be in
26+
sealed trait InvokerState {
27+
val asString: String
28+
val isUsable: Boolean
29+
}
30+
31+
object InvokerState {
32+
// Invokers in this state can be used to schedule workload to
33+
sealed trait Usable extends InvokerState { val isUsable = true }
34+
// No workload should be scheduled to invokers in this state
35+
sealed trait Unusable extends InvokerState { val isUsable = false }
36+
37+
// A completely healthy invoker, pings arriving fine, no system errors
38+
case object Healthy extends Usable { val asString = "up" }
39+
// The invoker can not create a container
40+
case object Unhealthy extends Unusable { val asString = "unhealthy" }
41+
// Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
42+
case object Unresponsive extends Unusable { val asString = "unresponsive" }
43+
// The invoker is down
44+
case object Offline extends Unusable { val asString = "down" }
45+
}
46+
47+
/**
48+
* Describes an abstract invoker. An invoker is a local container pool manager that
49+
* is in charge of the container life cycle management.
50+
*
51+
* @param id a unique instance identifier for the invoker
52+
* @param status it status (healthy, unhealthy, unresponsive, offline)
53+
*/
54+
case class InvokerHealth(id: InvokerInstanceId, status: InvokerState) {
55+
override def equals(obj: scala.Any): Boolean = obj match {
56+
case that: InvokerHealth => that.id == this.id && that.status == this.status
57+
case _ => false
58+
}
59+
60+
override def toString = s"InvokerHealth($id, $status)"
61+
}

common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,11 @@ object TransactionId {
232232

233233
val systemPrefix = "sid_"
234234

235-
var containerCreation = TransactionId(systemPrefix + "containerCreation")
236235
val unknown = TransactionId(systemPrefix + "unknown")
237236
val testing = TransactionId(systemPrefix + "testing") // Common id for for unit testing
238237
val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity
239238
val invokerHealthManager = TransactionId(systemPrefix + "invokerHealthManager") // Invoker startup/shutdown or GC activity
239+
def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
240240
val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker warmup thread that makes stem-cell containers
241241
val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker nanny thread
242242
val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message dispatcher
@@ -245,7 +245,9 @@ object TransactionId {
245245
val controller = TransactionId(systemPrefix + "controller") // Controller startup
246246
val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
247247
val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
248-
def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
248+
var containerCreation = TransactionId(systemPrefix + "containerCreation")
249+
var containerDeletion = TransactionId(systemPrefix + "containerDeletion")
250+
val warmUp = TransactionId(systemPrefix + "warmUp")
249251

250252
private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
251253

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core
19+
20+
import org.apache.openwhisk.common.TransactionId
21+
import org.apache.openwhisk.core.connector.{ActivationMessage, ContainerCreationMessage}
22+
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
23+
import org.apache.openwhisk.core.entity._
24+
25+
object WarmUp {
26+
val warmUpActionIdentity = {
27+
val whiskSystem = "whisk.system"
28+
val uuid = UUID()
29+
Identity(Subject(whiskSystem), Namespace(EntityName(whiskSystem), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
30+
}
31+
32+
private val actionName = "warmUp"
33+
34+
// this action doesn't need to be in database
35+
val warmUpAction = FullyQualifiedEntityName(warmUpActionIdentity.namespace.name.toPath, EntityName(actionName))
36+
37+
def warmUpActivation(controller: ControllerInstanceId) = {
38+
ActivationMessage(
39+
transid = TransactionId.warmUp,
40+
action = warmUpAction,
41+
revision = DocRevision.empty,
42+
user = warmUpActionIdentity,
43+
activationId = new ActivationIdGenerator {}.make(),
44+
rootControllerIndex = controller,
45+
blocking = false,
46+
content = None,
47+
initArgs = Set.empty)
48+
}
49+
50+
def warmUpContainerCreationMessage(scheduler: SchedulerInstanceId) =
51+
ExecManifest.runtimesManifest
52+
.resolveDefaultRuntime("nodejs:default")
53+
.map { manifest =>
54+
val metadata = WhiskActionMetaData(
55+
warmUpAction.path,
56+
warmUpAction.name,
57+
CodeExecMetaDataAsString(manifest, false, entryPoint = None))
58+
ContainerCreationMessage(
59+
TransactionId.warmUp,
60+
warmUpActionIdentity.namespace.name.toString,
61+
warmUpAction,
62+
DocRevision.empty,
63+
metadata,
64+
scheduler,
65+
"",
66+
0)
67+
}
68+
69+
def isWarmUpAction(fqn: FullyQualifiedEntityName): Boolean = {
70+
fqn == warmUpAction
71+
}
72+
}

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ object WhiskConfig {
203203
object ConfigKeys {
204204
val cluster = "whisk.cluster"
205205
val loadbalancer = "whisk.loadbalancer"
206+
val fraction = "whisk.fraction"
206207
val buildInformation = "whisk.info"
207208

208209
val couchdb = "whisk.couchdb"

common/scala/src/main/scala/org/apache/openwhisk/core/entity/Annotations.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ object Annotations {
2424
val RawHttpAnnotationName = "raw-http"
2525
val RequireWhiskAuthAnnotation = "require-whisk-auth"
2626
val ProvideApiKeyAnnotationName = "provide-api-key"
27+
val InvokerResourcesAnnotationName = "invoker-resources"
28+
val InvokerResourcesStrictPolicyAnnotationName = "invoker-resources-strict-policy"
2729
}

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,8 @@ import akka.http.scaladsl.model.Uri
2626
import akka.http.scaladsl.server.Route
2727
import akka.stream.ActorMaterializer
2828
import kamon.Kamon
29-
import pureconfig._
30-
import pureconfig.generic.auto._
31-
import spray.json.DefaultJsonProtocol._
32-
import spray.json._
3329
import org.apache.openwhisk.common.Https.HttpsConfig
34-
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
30+
import org.apache.openwhisk.common._
3531
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3632
import org.apache.openwhisk.core.connector.MessagingProvider
3733
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
@@ -40,13 +36,17 @@ import org.apache.openwhisk.core.entitlement._
4036
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
4137
import org.apache.openwhisk.core.entity.ExecManifest.Runtimes
4238
import org.apache.openwhisk.core.entity._
43-
import org.apache.openwhisk.core.loadBalancer.{InvokerState, LoadBalancerProvider}
39+
import org.apache.openwhisk.core.loadBalancer.LoadBalancerProvider
4440
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
4541
import org.apache.openwhisk.spi.SpiLoader
42+
import pureconfig._
43+
import spray.json.DefaultJsonProtocol._
44+
import spray.json._
45+
import pureconfig.generic.auto._
4646

47+
import scala.concurrent.Await
4748
import scala.concurrent.ExecutionContext.Implicits
4849
import scala.concurrent.duration.DurationInt
49-
import scala.concurrent.Await
5050
import scala.util.{Failure, Success}
5151

5252
/**

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,28 +43,6 @@ case object GetStatus
4343

4444
case object Tick
4545

46-
// States an Invoker can be in
47-
sealed trait InvokerState {
48-
val asString: String
49-
val isUsable: Boolean
50-
}
51-
52-
object InvokerState {
53-
// Invokers in this state can be used to schedule workload to
54-
sealed trait Usable extends InvokerState { val isUsable = true }
55-
// No workload should be scheduled to invokers in this state
56-
sealed trait Unusable extends InvokerState { val isUsable = false }
57-
58-
// A completely healthy invoker, pings arriving fine, no system errors
59-
case object Healthy extends Usable { val asString = "up" }
60-
// Pings are arriving fine, the invoker returns system errors though
61-
case object Unhealthy extends Unusable { val asString = "unhealthy" }
62-
// Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
63-
case object Unresponsive extends Unusable { val asString = "unresponsive" }
64-
// Pings are not arriving for this invoker
65-
case object Offline extends Unusable { val asString = "down" }
66-
}
67-
6846
// Possible answers of an activation
6947
sealed trait InvocationFinishedResult
7048
object InvocationFinishedResult {

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,18 @@
1717

1818
package org.apache.openwhisk.core.loadBalancer
1919

20-
import scala.concurrent.Future
2120
import akka.actor.{ActorRefFactory, ActorSystem, Props}
2221
import akka.stream.ActorMaterializer
23-
import org.apache.openwhisk.common.{Logging, TransactionId}
22+
import org.apache.openwhisk.common.{InvokerHealth, Logging, TransactionId}
2423
import org.apache.openwhisk.core.WhiskConfig
2524
import org.apache.openwhisk.core.connector._
2625
import org.apache.openwhisk.core.controller.Controller
2726
import org.apache.openwhisk.core.entity._
2827
import org.apache.openwhisk.spi.Spi
2928

29+
import scala.concurrent.Future
3030
import scala.concurrent.duration._
3131

32-
/**
33-
* Describes an abstract invoker. An invoker is a local container pool manager that
34-
* is in charge of the container life cycle management.
35-
*
36-
* @param id a unique instance identifier for the invoker
37-
* @param status it status (healthy, unhealthy, offline)
38-
*/
39-
class InvokerHealth(val id: InvokerInstanceId, val status: InvokerState) {
40-
override def equals(obj: scala.Any): Boolean = obj match {
41-
case that: InvokerHealth => that.id == this.id && that.status == this.status
42-
case _ => false
43-
}
44-
45-
override def toString = s"InvokerHealth($id, $status)"
46-
}
47-
4832
trait LoadBalancer {
4933

5034
/**

0 commit comments

Comments
 (0)