Skip to content

Commit a8a1f83

Browse files
committed
Implement FCPSchedulerServer
1 parent faae555 commit a8a1f83

File tree

4 files changed

+192
-22
lines changed

4 files changed

+192
-22
lines changed

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala renamed to core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.openwhisk.core.scheduler
1919

2020
import akka.actor.ActorSystem
21+
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
2122
import akka.http.scaladsl.model.StatusCodes
2223
import akka.http.scaladsl.model.headers.BasicHttpCredentials
2324
import akka.http.scaladsl.server.Route
2425
import org.apache.openwhisk.common.{Logging, TransactionId}
2526
import org.apache.openwhisk.http.BasicRasService
2627
import org.apache.openwhisk.http.ErrorResponse.terminate
28+
import spray.json.DefaultJsonProtocol._
2729
import spray.json._
2830

2931
import scala.concurrent.ExecutionContext
@@ -32,7 +34,7 @@ import scala.concurrent.ExecutionContext
3234
* Implements web server to handle certain REST API calls.
3335
* Currently provides a health ping route, only.
3436
*/
35-
class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
37+
class FPCSchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
3638
implicit val ec: ExecutionContext,
3739
implicit val actorSystem: ActorSystem,
3840
implicit val logger: Logging)
@@ -41,10 +43,28 @@ class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPa
4143
override def routes(implicit transid: TransactionId): Route = {
4244
super.routes ~ extractCredentials {
4345
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
44-
(path("disable") & post) {
46+
(path("state") & get) {
47+
complete {
48+
scheduler.getState.map {
49+
case (list, creationCount) =>
50+
(list
51+
.map(scheduler => scheduler._1.asString -> scheduler._2.toString)
52+
.toMap
53+
++ Map("creationCount" -> creationCount.toString)).toJson.asJsObject
54+
}
55+
}
56+
} ~ (path("disable") & post) {
4557
logger.warn(this, "Scheduler is disabled")
4658
scheduler.disable()
4759
complete("scheduler disabled")
60+
} ~ (path(FPCSchedulerServer.queuePathPrefix / "total") & get) {
61+
complete {
62+
scheduler.getQueueSize.map(_.toString)
63+
}
64+
} ~ (path(FPCSchedulerServer.queuePathPrefix / "status") & get) {
65+
complete {
66+
scheduler.getQueueStatusData.map(s => s.toJson)
67+
}
4868
}
4969
case _ =>
5070
implicit val jsonPrettyResponsePrinter = PrettyPrinter
@@ -53,21 +73,16 @@ class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPa
5373
}
5474
}
5575

56-
object SchedulerServer {
76+
object FPCSchedulerServer {
5777

58-
val schedulerUsername = {
59-
val source = scala.io.Source.fromFile("/conf/schedulerauth.username")
60-
try source.mkString.replaceAll("\r|\n", "")
61-
finally source.close()
62-
}
63-
val schedulerPassword = {
64-
val source = scala.io.Source.fromFile("/conf/schedulerauth.password")
65-
try source.mkString.replaceAll("\r|\n", "")
66-
finally source.close()
67-
}
78+
// TODO: TBD, after FPCScheduler is ready, can read the credentials from pureconfig
79+
val schedulerUsername = "admin"
80+
val schedulerPassword = "admin"
81+
82+
val queuePathPrefix = "queue"
6883

6984
def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext,
7085
actorSystem: ActorSystem,
7186
logger: Logging): BasicRasService =
72-
new SchedulerServer(scheduler, schedulerUsername, schedulerPassword)
87+
new FPCSchedulerServer(scheduler, schedulerUsername, schedulerPassword)
7388
}

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,21 +81,20 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
8181
val durationChecker = "" // TODO: TBD
8282

8383
override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
84-
Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
84+
Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD, after etcdClient is ready, can implement it
8585
}
8686

8787
override def getQueueSize: Future[Int] = {
88-
Future.successful(0) // TODO: TBD
88+
Future.successful(0) // TODO: TBD, after queueManager is ready, can implement it
8989
}
9090

91-
override def getQueueStatusData: Future[List[String]] = {
92-
Future.successful(List("")) // TODO: TBD
91+
override def getQueueStatusData: Future[List[StatusData]] = {
92+
Future.successful(List(StatusData("ns", "fqn", 0, "Running", "data"))) // TODO: TBD, after queueManager is ready, can implement it
9393
}
9494

95-
// other components don't need to shutdown gracefully
9695
override def disable(): Unit = {
9796
logging.info(this, s"Gracefully shutting down the scheduler")
98-
// TODO: TBD, gracefully shut down the container manager and queue manager
97+
// TODO: TBD, after containerManager and queueManager are ready, can implement it
9998
}
10099

101100
private def getUserLimit(invocationNamespace: String): Future[Int] = {
@@ -160,7 +159,7 @@ trait SchedulerCore {
160159

161160
def getQueueSize: Future[Int]
162161

163-
def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
162+
def getQueueStatusData: Future[List[StatusData]]
164163

165164
def disable(): Unit
166165
}
@@ -253,7 +252,7 @@ object Scheduler {
253252
val httpsConfig =
254253
if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None
255254

256-
BasicHttpService.startHttpService(SchedulerServer.instance(scheduler).route, port, httpsConfig)(
255+
BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)(
257256
actorSystem,
258257
ActorMaterializer.create(actorSystem))
259258

tests/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ task copyMeasurementFiles() {
325325
doLast{
326326
Project common = project(":common:scala")
327327
Project controller = project(":core:controller")
328+
Project scheduler = project(":core:scheduler")
328329
Project invoker = project(":core:invoker")
329330

330331
Properties wskProps = loadWhiskProps()
@@ -335,6 +336,8 @@ task copyMeasurementFiles() {
335336

336337
copyAndRenameMeasurementFile(covLogs, 'controller', "common", common)
337338
copyAndRenameMeasurementFile(covLogs, 'controller', "controller", controller)
339+
copyAndRenameMeasurementFile(covLogs, 'scheduler', "common", common)
340+
copyAndRenameMeasurementFile(covLogs, 'scheduler', "scheduler", scheduler)
338341
copyAndRenameMeasurementFile(covLogs, 'invoker', "common", common)
339342
copyAndRenameMeasurementFile(covLogs, 'invoker', "invoker", invoker)
340343
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.scheduler
19+
20+
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
21+
import akka.http.scaladsl.model.StatusCodes._
22+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
23+
import akka.http.scaladsl.server.Route
24+
import akka.http.scaladsl.testkit.ScalatestRouteTest
25+
import common.StreamLogging
26+
import org.apache.openwhisk.common.TransactionId
27+
import org.apache.openwhisk.core.connector.StatusData
28+
import org.apache.openwhisk.core.entity.SchedulerInstanceId
29+
import org.junit.runner.RunWith
30+
import org.scalamock.scalatest.MockFactory
31+
import org.scalatest.junit.JUnitRunner
32+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
33+
import spray.json.DefaultJsonProtocol._
34+
import spray.json._
35+
36+
import scala.concurrent.Future
37+
38+
/**
39+
* Tests SchedulerServer API.
40+
*/
41+
@RunWith(classOf[JUnitRunner])
42+
class FPCSchedulerServerTests
43+
extends FlatSpec
44+
with BeforeAndAfterEach
45+
with BeforeAndAfterAll
46+
with ScalatestRouteTest
47+
with Matchers
48+
with StreamLogging
49+
with MockFactory {
50+
51+
def transid() = TransactionId("tid")
52+
53+
val systemUsername = "username"
54+
val systemPassword = "password"
55+
56+
val queues = List((SchedulerInstanceId("0"), 2), (SchedulerInstanceId("1"), 3))
57+
val creationCount = 1
58+
val testQueueSize = 2
59+
val statusDatas = List(
60+
StatusData("testns1", "testaction1", 10, "Running", "RunningData"),
61+
StatusData("testns2", "testaction2", 5, "Running", "RunningData"))
62+
63+
// Create scheduler
64+
val scheduler = new TestScheduler(queues, creationCount, testQueueSize, statusDatas)
65+
val server = new FPCSchedulerServer(scheduler, systemUsername, systemPassword)
66+
67+
override protected def afterEach(): Unit = scheduler.reset()
68+
69+
/** FPCSchedulerServer API tests */
70+
behavior of "FPCSchedulerServer API"
71+
72+
// POST /disable
73+
it should "disable scheduler" in {
74+
implicit val tid = transid()
75+
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
76+
Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
77+
status should be(OK)
78+
scheduler.shutdownCount shouldBe 1
79+
}
80+
}
81+
82+
// GET /state
83+
it should "get scheduler state" in {
84+
implicit val tid = transid()
85+
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
86+
Get(s"/state") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
87+
status should be(OK)
88+
responseAs[JsObject] shouldBe (queues.map(s => s._1.asString -> s._2.toString).toMap ++ Map(
89+
"creationCount" -> creationCount.toString)).toJson
90+
}
91+
}
92+
93+
// GET /queue/total
94+
it should "get total queue" in {
95+
implicit val tid = transid()
96+
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
97+
Get(s"/queue/total") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
98+
status should be(OK)
99+
responseAs[String] shouldBe testQueueSize.toString
100+
}
101+
}
102+
103+
// GET /queue/status
104+
it should "get all queue status" in {
105+
implicit val tid = transid()
106+
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
107+
Get(s"/queue/status") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
108+
status should be(OK)
109+
responseAs[List[JsObject]] shouldBe statusDatas.map(_.toJson)
110+
}
111+
}
112+
113+
// POST /disable with invalid credential
114+
it should "not call scheduler api with invalid credential" in {
115+
implicit val tid = transid()
116+
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
117+
Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
118+
status should be(Unauthorized)
119+
scheduler.shutdownCount shouldBe 0
120+
}
121+
}
122+
123+
// POST /disable with empty credential
124+
it should "not call scheduler api with empty credential" in {
125+
implicit val tid = transid()
126+
Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
127+
status should be(Unauthorized)
128+
scheduler.shutdownCount shouldBe 0
129+
}
130+
}
131+
132+
}
133+
134+
class TestScheduler(schedulerStates: List[(SchedulerInstanceId, Int)],
135+
creationCount: Int,
136+
queueSize: Int,
137+
statusDatas: List[StatusData])
138+
extends SchedulerCore {
139+
var shutdownCount = 0
140+
141+
override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] =
142+
Future.successful(schedulerStates, creationCount)
143+
144+
override def getQueueSize: Future[Int] = Future.successful(queueSize)
145+
146+
override def getQueueStatusData: Future[List[StatusData]] = Future.successful(statusDatas)
147+
148+
override def disable(): Unit = shutdownCount += 1
149+
150+
def reset(): Unit = {
151+
shutdownCount = 0
152+
}
153+
}

0 commit comments

Comments
 (0)