Skip to content

Commit 7b99af9

Browse files
authored
[New Scheduler] Initial commit for the scheduler component (#4983)
* Initial commit for the scheduler component * Add a license header * Apply comments. * Move configuration checkups to above. * Add supplementary comments
1 parent cb16450 commit 7b99af9

File tree

10 files changed

+548
-0
lines changed

10 files changed

+548
-0
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ object TransactionId {
232232

233233
val systemPrefix = "sid_"
234234

235+
var containerCreation = TransactionId(systemPrefix + "containerCreation")
235236
val unknown = TransactionId(systemPrefix + "unknown")
236237
val testing = TransactionId(systemPrefix + "testing") // Common id for for unit testing
237238
val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ class WhiskConfig(requiredProperties: Map[String, String],
8585
val triggerFirePerMinuteLimit = this(WhiskConfig.triggerFirePerMinuteLimit)
8686
val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
8787
val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)
88+
89+
val schedulerHost = this(WhiskConfig.schedulerHost)
90+
val schedulerRpcPort = this(WhiskConfig.schedulerRpcPort)
91+
val schedulerAkkaPort = this(WhiskConfig.schedulerAkkaPort)
8892
}
8993

9094
object WhiskConfig {
@@ -190,6 +194,10 @@ object WhiskConfig {
190194
val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent"
191195
val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
192196
val controllerSeedNodes = "akka.cluster.seed.nodes"
197+
198+
val schedulerHost = "whisk.scheduler.endpoints.host"
199+
val schedulerRpcPort = "whisk.scheduler.endpoints.rpcPort"
200+
val schedulerAkkaPort = "whisk.scheduler.endpoints.akkaPort"
193201
}
194202

195203
object ConfigKeys {

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,3 +427,33 @@ object EventMessage extends DefaultJsonProtocol {
427427

428428
def parse(msg: String) = Try(format.read(msg.parseJson))
429429
}
430+
431+
/**
432+
* This case class is used when retrieving the snapshot of the queue status from the scheduler at a certain moment.
433+
* This is useful to figure out the internal status when any issue happens.
434+
* The following would be an example result.
435+
*
436+
* [
437+
* ...
438+
* {
439+
* "data": "RunningData",
440+
* "fqn": "whisk.system/elasticsearch/[email protected]",
441+
* "invocationNamespace": "style95",
442+
* "status": "Running",
443+
* "waitingActivation": 1
444+
* },
445+
* ...
446+
* ]
447+
*/
448+
object StatusQuery
449+
case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
450+
extends Message {
451+
452+
override def serialize: String = StatusData.serdes.write(this).compactPrint
453+
454+
}
455+
object StatusData extends DefaultJsonProtocol {
456+
457+
implicit val serdes =
458+
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
459+
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,17 @@ case class ControllerInstanceId(asString: String) extends InstanceId {
5757
override val toJson: JsValue = ControllerInstanceId.serdes.write(this)
5858
}
5959

60+
case class SchedulerInstanceId(val asString: String) extends InstanceId {
61+
validate(asString)
62+
override val instanceType = "scheduler"
63+
64+
override val source = s"$instanceType$asString"
65+
66+
override val toString: String = source
67+
68+
override val toJson: JsValue = SchedulerInstanceId.serdes.write(this)
69+
}
70+
6071
object InvokerInstanceId extends DefaultJsonProtocol {
6172
def parse(c: String): Try[InvokerInstanceId] = Try(serdes.read(c.parseJson))
6273

@@ -112,6 +123,10 @@ object ControllerInstanceId extends DefaultJsonProtocol {
112123
}
113124
}
114125

126+
object SchedulerInstanceId extends DefaultJsonProtocol {
127+
implicit val serdes = jsonFormat(SchedulerInstanceId.apply _, "asString")
128+
}
129+
115130
trait InstanceId {
116131

117132
// controller ids become part of a kafka topic, hence, hence allow only certain characters

core/scheduler/Dockerfile

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
FROM scala
19+
20+
ENV UID=1001 \
21+
NOT_ROOT_USER=owuser
22+
23+
# Copy app jars
24+
ADD build/distributions/scheduler.tar /
25+
26+
COPY init.sh /
27+
RUN chmod +x init.sh
28+
29+
RUN adduser -D -u ${UID} -h /home/${NOT_ROOT_USER} -s /bin/bash ${NOT_ROOT_USER}
30+
USER ${NOT_ROOT_USER}
31+
32+
EXPOSE 8080
33+
CMD ["./init.sh", "0"]

core/scheduler/build.gradle

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
apply plugin: 'scala'
19+
apply plugin: 'application'
20+
apply plugin: 'eclipse'
21+
apply plugin: 'maven'
22+
apply plugin: 'org.scoverage'
23+
24+
ext.dockerImageName = 'scheduler'
25+
apply from: '../../gradle/docker.gradle'
26+
distDocker.dependsOn ':common:scala:distDocker', 'distTar'
27+
28+
project.archivesBaseName = "openwhisk-scheduler"
29+
30+
ext.coverageDirs = [
31+
"${buildDir}/classes/scala/scoverage",
32+
"${project(':common:scala').buildDir.absolutePath}/classes/scala/scoverage"
33+
]
34+
distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses'
35+
36+
// Define a separate configuration for managing the dependency on Jetty ALPN agent.
37+
configurations {
38+
alpnagent
39+
}
40+
41+
dependencies {
42+
configurations.all {
43+
resolutionStrategy.force "com.lihaoyi:fastparse_${gradle.scala.depVersion}:2.1.3"
44+
resolutionStrategy.force "com.typesafe.akka:akka-http-core_${gradle.scala.depVersion}:${gradle.akka_http.version}"
45+
resolutionStrategy.force "com.typesafe.akka:akka-http_${gradle.scala.depVersion}:${gradle.akka_http.version}"
46+
resolutionStrategy.force "com.typesafe.akka:akka-http2-support_${gradle.scala.depVersion}:${gradle.akka_http.version}"
47+
resolutionStrategy.force "com.typesafe.akka:akka-http-spray-json_${gradle.scala.depVersion}:${gradle.akka_http.version}"
48+
resolutionStrategy.force "com.typesafe.akka:akka-parsing_${gradle.scala.depVersion}:${gradle.akka_http.version}"
49+
resolutionStrategy.force "com.typesafe.akka:akka-http_${gradle.scala.depVersion}:${gradle.akka_http.version}"
50+
}
51+
52+
compile "org.scala-lang:scala-library:${gradle.scala.version}"
53+
compile project(':common:scala')
54+
55+
}
56+
57+
mainClassName = "org.apache.openwhisk.core.scheduler.Scheduler"
58+
applicationDefaultJvmArgs = ["-Djava.security.egd=file:/dev/./urandom"]

core/scheduler/init.sh

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/bin/bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
./copyJMXFiles.sh
20+
21+
export SCHEDULER_OPTS
22+
SCHEDULER_OPTS="$SCHEDULER_OPTS -Dakka.remote.netty.tcp.bind-hostname=$(hostname -i) $(./transformEnvironment.sh)"
23+
24+
exec scheduler/bin/scheduler "$@"

0 commit comments

Comments
 (0)